Net 6 ConsoleApp multiple BlockingCollection<T> huge CPU consumption

101 views Asked by At

I have a Net 6 Console app where I use several BlockingCollections to process files that are dropped in a folder. I watch the folder using Net's FileWatcher().

In the Created event, I use a Channel to handle the processing, which is done in two phases, and after each phase the result item is moved to a BlockingCollection, that will then be consumed by the next phase.

Program.cs

public static async Task Main(string[] args)
{
   BlockingCollection<FileMetadata> _fileMetaDataQueue = new BlockingCollection<FileMetadata>()
   var channel = Channel.CreateUnbounded<FileSystemEventArgs>();     

   // Start a task to monitor the channel and process notifications
   var notificationProcessor = Task.Run(() => ProcessNotifications(channel, _fileMetaDataQueue));

   Task fileCopyingTask = Task.Run(() => fileCopyThread.Start()); //injected using DI

   Task processMovedFile = Task.Run(() => ProcessDestinationThread.Start()); //injected using DI

   Task retryOnErrorTask = Task.Run(() => RetryOnErrorThread.Start()); //injected using DI
   using var watcher = new FileSystemWatcher(sourceFolder); //C:\temp
   // other fw related config
   watcher.Created += (sender, e) => channel.Writer.WriteAsync(e);
}

private async Task ProcessNotifications(Channel<FileSystemEventArgs> channel, BlockingCollection<FileMetadata> queue)
{
   await foreach (var e in channel.Reader.ReadAllAsync())
   {
      Thread.Sleep(300); // So the file is released after it is dropped
      try
      {
         // Process the file and add its name and extension to the queue                
         FileMetaData fileInfo = ExtractFileMetadata(e.FullPath); //processing method               
         queue.Add(fileInfo);
      }
      try
      {
         // logging etc
      }
   }           
}

The BlockingCollection queue is then consumed in the FileCopyThread class, with the Start() method exposed (and called)

FileCopyThread.cs

BlockingCollection<FileMetadata> resultQueue = new();
BlockingCollection<FileMetadata> retryQueue = new();

public async Task Start()
{
    await Task.Run(() => {
        ProcessQueue();
   });
}

private void ProcessQueue()
{
    // Since IsCompleted is never set, it will always run
    while (!fileMetadataQueue.IsCompleted)
    {
        // Try to remove an item from the queue
        if (fileMetadataQueue.TryTake(out FileMetadata result))
        {
           // Copy the file to a new location
           var newFileLocation = processorOps.MoveFile(result); // move file to other path
                    
           // Add the new file location to the result queue
           if (newFileLocation != String.Empty) 
           { 
               result.newFileLocation = newFileLocation;
               resultQueue.Add(result); 
           }
           else {                      
              retryQueue.Add(result);                        
           }
        }
    }
}

The ProcessDestinationThread and RetryOnErrorThread work in exactly the same way, but do some different processing, and consume the resultQueue and the retryQueue, respectively. Now when I run this app, it works fine, everything gets processed as expected, but my CPU and power usage is between 85% and 95%, which is huge, IMO, and does so even when it is not processing anything, just sitting idle. I figured this is because all the infinite loops, but how can I remedy this? Birds eye view: What I would like is that in case the filewatcher.created event is not firing (ie no files are dropped) then the all the queues after it can be running in idle, so to speak. No need for constant checking, then.

I thought about calling CompleteAdding() on the BlockingCollection<T>, but it seems that I cannot reverse that. And the app is supposed to run indefinitely: So if the drop folder is empty, it might be receiving new files at any time.

Is there a way how I can reduce the CPU usage of my application?

Ps. I am aware that this code is not a fully working example. The real code is far more complex than this, and I had to remove a lot of stuff that is distracting. If you think any pieces of relevant code are missing, I can provide them. I hope this code will at least make clear what I am trying to achieve.

1

There are 1 answers

0
Theodor Zoulias On
private void ProcessQueue()
{
   while (!fileMetadataQueue.IsCompleted)
   {
       if (fileMetadataQueue.TryTake(out FileMetadata result))
       {
           //...
       }
   }
}

This pattern for consuming a BlockingCollection<T> is incorrect. It causes a tight loop that burns unproductively a CPU core. The correct pattern is to use the GetConsumingEnumerable method:

private void ProcessQueue()
{
    foreach (FileMetadata result in fileMetadataQueue.GetConsumingEnumerable())
    {
        //...
    }
}