Efficient string replace in a stream

106 views Asked by At

I need to replace a string in a streaming http response. The naive way of doing that would be

using var reader = new StreamReader(input, leaveOpen: true);
var original = await reader.ReadToEndAsync();
var replaced = original.Replace(old, new, StringComparison.InvariantCultureIgnoreCase);
await output.WriteAsync(Encoding.UTF8.GetBytes(replaced));

This is very resource and memory intensive since the complete response must be read into memory before replacing the strings.

I've been looking at System.IO.Pipelines and the PipeReader. While this does give me efficient access to the stream, it works on byte which makes the conversion to char problematic when working in Utf-8.

One method I've seen is to use ReadLineAsync on the streamreader, but I cannot know if the stream will contain any newlines.

Another method I've seen is to use a queue, but even that seems clumsy.

So my question is: what is the best way to replace text is a stream without reading the full stream in memory?

2

There are 2 answers

15
BionicCode On BEST ANSWER

If you have an infinite Stream like a NetworkStream, replacing string tokens on-the-fly would make a lot of sense. But because you are processing a finite stream of which you need the complete content, such a filtering doesn't make sense because of the performance impact.

My argument is that you would have to use buffering. The size of the buffer is of course restricting the amount of characters you can process. Assuming you use a ring buffer or kind of a queue, you would have to remove one character to append a new. This leads to a lot of drawbacks when compared to processing the complete content.

Pros & Cons

Full-content search & replace Buffered (real-time) search & replace
Single pass over the full range multiple passes over the same range
Advanced search (e.g. backtracking, look-ahead etc.) Because characters get dropped at the start of the buffer, the search engine loses context
Because we have the full context available, we won't ever miss a match Because characters get dropped at the start of the buffer, there will be edge cases where the searched token does not fit into the buffer. Of course, we can resize the buffer to a length = n * token_length. However, this limitation impacts performance as the optimal buffer size is now constrained by the token size. The worst case is that the token length equals the length of the stream. Based on the search algorithm we would have to keep input in the buffer until we match all key words and then discard the matched part. If there is no match the buffer size would equal the size of the stream. In streaming scenarios e.g. network socket, we don't even know the stream size in advance (expect it to be infinite). In this case we have to define a "random" buffer size and risk to lose matches that span across multiple buffer reads. It makes sense to evaluate the expected input and search keys for the worst-case scenarios and switch to a full-context search to get rid of the buffer management overhead. Just to highlight that the efficiency of a real-time search is very much limited by the input and the search keys (expected scenario). It can't be faster than full-context search. It potentially consumes less memory under optimal circumstances.
Don't have to worry about an optimal buffer size to maximize efficiency Buffer size becomes more and more important the longer the source content is. Too small buffers result in too many buffer shifts and too many passes over the same range. Note, the key cost for the original search & replace task is the string search and the replace/modification, so it makes sense to reduce the number of comparisons to an absolute minimum.
Consumes more memory. However, this is not relevant in this case as we will store the complete response in (client) memory anyways. And if we use the appropriate text search technique, we avoid all the extra string allocations that occur during the search & replace. Only the size of the buffers are allocated. However, this is not relevant in this case as we will store the complete response in (client) memory anyways.
Does not allow to filter the stream in real-time (not relevant in this case)). However, a hybrid solution is possible where we would filter certain parts in real-time (e.g. a preamble) Allows to filter the stream in real-time so that we can make decisions based on content e.g. abort the reading (not relevant in this case).

I stop here as the most relevant performance costs search & replace are better for the full-content solution. For the real-time search & replace we basically would have to implement our own algorithm that has to compete against the .NET search and replace algorithms. No problem, but considering the effort and the final use case I would not waste any time on that.

An efficient solution could implement a custom TextReader, an advanced StreamReader that operates on a StringBuilder to search and replace characters. While StringBuilder offers a significant performance advantage over the string search & replace it does not allow complex search patterns like word boundaries. For example, word boundaries are only possible if the patter explicitly includes the bounding characters.

For example, replacing "int" in the input "internal int " with "pat" produces "paternal pat". If we want to replace only "int" in "internal int " we would have to use regular expression. Because regular expression only operates on string we have to pay with efficiency.

The following example implements a StringReplaceStreamReader that extends TextReader to act as a specialized StreamReader. For best performance, tokens are replaced after the complete stream has been read.
For brevity it only supports ReadToEndAsync, Read and Peak methods.
It supports simple search where the search pattern is simply matched against the input (called simple-search).
Then it also supports two variants of regular expression search and replace for more advanced search and replace scenarios.
The first variant is based on a set of key-value pairs while the second variant uses a regex pattern provided by the caller.

Because simple-search involves iteration of the source dictionary entries + multiple passes (one for each entry) this search mode is expected to be the slowest algorithm, although the replace using a StringBuilder itself is actually faster. Under these circumstances, the Regex search & replace is expected to be significantly faster than the simple search and replace using the StringBuilder as it can process the input in a single pass.

The StringReplaceStreamReader search behavior is configurable via constructor.

Usage example


private Dictionary<string, string> replacementTable;

private async Task ReplaceInStream(Stream sourceStream)
{
  this.replacementTable = new Dictionary<string, string>
  {
    { "private", "internal" },
    { "int", "BUTTON" },
    { "Read", "Not-To-Read" }
  };

  // Search and replace using simple search 
  // (slowest).
  await using var streamReader = new StringReplaceStreamReader(sourceStream, Encoding.Default, this.replacementTable, StringComparison.OrdinalIgnoreCase);
  string text = await streamReader.ReadToEndAsync();

  // Search and replace variant #1 using regex with key-value pairs instead of a regular expression pattern
  // for advanced scenarios (fast)
  await using var streamReader2 = new StringReplaceStreamReader(sourceStream, Encoding.Default, this.replacementTable, SearchMode.Regex);
  string text2 = await streamReader2.ReadToEndAsync();

  // Search and replace variant #2 using regex and regular expression pattern 
  // for advanced scenarios (fast).
  // The matchEvaluator callback actually provides the replcement value for each match.

  // Creates the following regular expression:
  // "\bprivate\b|\bint\b|\bRead\b"
  string searchPattern = this.replacementTable.Keys
    .Select(key => $@"\b{key}\b")
    .Aggregate((current, newValue) => $"{current}|{newValue}");

  await using var streamReader3 = new StringReplaceStreamReader(sourceStream, Encoding.Default, searchPattern, Replace);
  string text3 = await streamReader.ReadToEndAsync();
}


private string Replace(Match match) 
  => this.replacementTable.TryGetValue(match.Value, out string replacement) 
    ? replacement 
    : match.Value;

Implementation

SearchMode

public enum SearchMode
{
  Default = 0,
  Simple,
  Regex
}

StringReplaceStreamReader.cs

public class StringReplaceStreamReader : TextReader, IDisposable, IAsyncDisposable
{
  public Stream BaseStream { get; }
  public long Length => this.BaseStream.Length;
  public bool EndOfStream => this.BaseStream.Position == this.BaseStream.Length;

  public SearchMode SearchMode { get; }

  private const int DefaultCapacity = 4096;
  private readonly IDictionary<string, string> stringReplaceTable;
  private readonly StringComparison stringComparison;
  private readonly Encoding encoding;
  private readonly Decoder decoder;
  private readonly MatchEvaluator? matchEvaluator;
  private readonly Regex? regularExpression;
  private readonly byte[] byteBuffer;
  private readonly char[] charBuffer;

  public StringReplaceStreamReader(Stream stream, Encoding encoding, IDictionary<string, string> stringReplaceTable)
    : this(stream, encoding, stringReplaceTable, StringComparison.OrdinalIgnoreCase, SearchMode.Simple)
  {
  }

  public StringReplaceStreamReader(Stream stream, Encoding encoding, IDictionary<string, string> stringReplaceTable, SearchMode searchMode)
    : this(stream, encoding, stringReplaceTable, StringComparison.OrdinalIgnoreCase, searchMode)
  {
  }

  public StringReplaceStreamReader(Stream stream, Encoding encoding, IDictionary<string, string> stringReplaceTable, StringComparison stringComparison)
    : this(stream, encoding, stringReplaceTable, stringComparison, SearchMode.Simple)
  {
  }

  public StringReplaceStreamReader(Stream stream, Encoding encoding, IDictionary<string, string> stringReplaceTable, StringComparison stringComparison, SearchMode searchMode)
  {
    ArgumentNullException.ThrowIfNull(stream, nameof(stream));
    ArgumentNullException.ThrowIfNull(stringReplaceTable, nameof(stringReplaceTable));

    this.BaseStream = stream;
    this.encoding = encoding ?? Encoding.Default;
    this.decoder = this.encoding.GetDecoder();
    this.stringReplaceTable = stringReplaceTable;
    this.stringComparison = stringComparison;
    this.SearchMode = searchMode;
    this.regularExpression = null;
    this.matchEvaluator = ReplaceMatch;

    if (searchMode is SearchMode.Regex)
    {
      RegexOptions regexOptions = CreateDefaultRegexOptions(stringComparison);

      var searchPatternBuilder = new StringBuilder();
      foreach (KeyValuePair<string, string> entry in stringReplaceTable)
      {
        // Creates the following regular expression:
        // "\b[search_key]\b|\b[search_key]\b"
        string pattern = @$"\b{entry.Key}\b";
        searchPatternBuilder.Append(pattern);
        searchPatternBuilder.Append('|');
      }

      string searchPattern = searchPatternBuilder.ToString().TrimEnd('|');
      this.regularExpression = new Regex(searchPattern, regexOptions);
    }

    this.byteBuffer = new byte[StringReplaceStreamReader.DefaultCapacity];
    int charBufferSize = this.encoding.GetMaxCharCount(this.byteBuffer.Length);
    this.charBuffer = new char[charBufferSize];
  }

  public StringReplaceStreamReader(Stream stream, Encoding encoding, string searchAndReplacePattern, MatchEvaluator matchEvaluator)
    : this(stream, encoding, searchAndReplacePattern, matchEvaluator, RegexOptions.None)
  {
  }

  public StringReplaceStreamReader(Stream stream, Encoding encoding, string searchAndReplacePattern, MatchEvaluator matchEvaluator, RegexOptions regexOptions)
  {
    ArgumentNullException.ThrowIfNull(stream, nameof(stream));
    ArgumentNullException.ThrowIfNullOrWhiteSpace(searchAndReplacePattern, nameof(searchAndReplacePattern));
    ArgumentNullException.ThrowIfNull(matchEvaluator, nameof(matchEvaluator));

    this.BaseStream = stream;
    this.encoding = encoding ?? Encoding.Default;
    this.decoder = this.encoding.GetDecoder();
    this.matchEvaluator = matchEvaluator;
    this.SearchMode = SearchMode.Regex;
    this.stringReplaceTable = new Dictionary<string, string>();
    this.stringComparison = StringComparison.OrdinalIgnoreCase;

    if (regexOptions is RegexOptions.None)
    {
      regexOptions = CreateDefaultRegexOptions(stringComparison);
    }
    else if ((regexOptions & RegexOptions.Compiled) == 0)
    {
      regexOptions |= RegexOptions.Compiled;
    }

    this.regularExpression = new Regex(searchAndReplacePattern, regexOptions);
    this.byteBuffer = new byte[StringReplaceStreamReader.DefaultCapacity];
    int charBufferSize = this.encoding.GetMaxCharCount(this.byteBuffer.Length);
    this.charBuffer = new char[charBufferSize];
  }

  public override int Peek()
  {
    int value = Read();
    this.BaseStream.Seek(this.BaseStream.Position - 1, SeekOrigin.Begin);

    return value;
  }

  public override int Read()
    => this.BaseStream.ReadByte();

  public override Task<string> ReadToEndAsync()
    => ReadToEndAsync(CancellationToken.None);

  public override async Task<string> ReadToEndAsync(CancellationToken cancellationToken)
  {
    if (!this.BaseStream.CanRead)
    {
      throw new InvalidOperationException("Source stream is not readable.");
    }

    var textBuilder = new StringBuilder(StringReplaceStreamReader.DefaultCapacity);

    while (!this.EndOfStream)
    {
      cancellationToken.ThrowIfCancellationRequested();

      int bytesRead = await this.BaseStream.ReadAsync(buffer, 0, buffer.Length);
      bool flush = this.EndOfStream;
      int charsRead = this.decoder.GetChars(this.byteBuffer, 0, bytesRead, this.charBuffer, 0, flush);
      textBuilder.Append(charBuffer, 0, charsRead);
    }

    cancellationToken.ThrowIfCancellationRequested();
    SearchAndReplace(textBuilder, cancellationToken, out string result);

    return result;
  }

  public ValueTask DisposeAsync()
    => ((IAsyncDisposable)this.BaseStream).DisposeAsync();

  private void SearchAndReplace(StringBuilder textBuilder, CancellationToken cancellationToken, out string result)
  {
    cancellationToken.ThrowIfCancellationRequested();

    if (this.SearchMode is SearchMode.Simple or SearchMode.Default)
    {
      foreach (KeyValuePair<string, string> entry in this.stringReplaceTable)
      {
        cancellationToken.ThrowIfCancellationRequested();

        textBuilder.Replace(entry.Key, entry.Value);
      }

      result = textBuilder.ToString();
    }
    else if (this.SearchMode is SearchMode.Regex)
    {
      string input = textBuilder.ToString();
      result = this.regularExpression!.Replace(input, this.matchEvaluator!);
    }
    else
    {
      throw new NotImplementedException($"Search mode {this.SearchMode} is not implemented.");
    }
  }

  private string ReplaceMatch(Match match)
    => this.stringReplaceTable.TryGetValue(match.Value, out string replacement)
      ? replacement
      : match.Value;

  private RegexOptions CreateDefaultRegexOptions(StringComparison stringComparison)
  {
    RegexOptions regexOptions = RegexOptions.Multiline | RegexOptions.Compiled;
    if (stringComparison is StringComparison.CurrentCultureIgnoreCase or StringComparison.InvariantCultureIgnoreCase or StringComparison.OrdinalIgnoreCase)
    {
      regexOptions |= RegexOptions.IgnoreCase;
    }

    return regexOptions;
  }
}
1
shingo On

My thought:

  1. Read some chars from the input stream and fill a buffer.
  2. Search old[0] in the buffer:
    • If not found, output the whole buffer, then go to step 1.
    • If found, compare the remaining chars in old:
      • If the complete old sequence is found, first output the chars before the position of old[0], then output new. Go back to step 2.
      • If reached the end of buffer, first output the chars before the position of old[0], then move the remaining chars to the start of the buffer. Go to step 1.
      • If a char doesn't match, then go back to step 2 and start from the next one.