Parallel.ForEach returning same fileObject twice

80 views Asked by At

I'm trying to read a VERY large number of files from AWSS3 storage. No issues with reading them etc. But every now and again I get two threads with the SAME fileObject?

I've tried keeping a List<string> or a ConcurrentBag<string> of files I've processed but as the two threads hit at the same time - both think they are the first and still get the same duplication.

Any ideas how I could work around or prevent this?

List<S3Object> fileObjects = listFilesResponse.Result.S3Objects;
filecount += listFilesResponse.Result.KeyCount;

ParallelOptions paralleOpts = new ParallelOptions
{
    MaxDegreeOfParallelism = Environment.ProcessorCount
};

Parallel.ForEach(fileObjects, paralleOpts, fileObject =>
{
    fileRequest.Key = fileObject.Key;

    using (var fileResponse = s3Client.GetObjectAsync(fileRequest))
    using (Stream responseStream = fileResponse.Result.ResponseStream)
    using (StreamReader reader = new StreamReader(responseStream))
    {
        //do stuff with file...
    }
});
1

There are 1 answers

5
Panagiotis Kanavos On

Parallel.ForEach doesn't return anything. It's definitely not meant for asynchronous or IO calls either, only for data parallelism, ie processing lots of in-memory data by partitioning and using all cores to process the partitions.

The first big problem is using a global variable, fileRequest, for the request of all objects. Threads will overwrite each other's changes and can easily end up making the same request.

List<T> isn't thread-safe either,so the modifications made to it by Parallel.ForEach threads will result either in concurrency exceptions or bad data.

The code needs a bit of fixing:

  • A new request object should be created each time
  • AWS methods are asynchronous, so the method should be asynchronous as well. Blocking the asynchronous methods will only cause problems.
  • Instead of Parallel.ForEach, use Parallel.ForEachAsync. That method is meant for asynchronous operations. It still doesn't return any results.
  • Store the data in a concurrent collection like ConcurrentQueue<> or ConcurrentDictionary<>. ConcurrentBag is a specialized collection, meant for thread-local storage.
async Task<ConcurrentQueue<ResultDTO>> MyDownloaderAsync()
{
    var results=new ConcurrentQueue<ResultDTO>();
    ...
    var fileObjects= await client.ListObjectsV2Async(request);

    await Parallel.ForEach(fileObjects, async (fileObject,_) =>{
        var fileRequest = new GetObjectRequest
        {
            BucketName = bucketName,
            Key = fileObject.Key;
        };

        using var fileResponse = await s3Client.GetObjectAsync(fileRequest);
        using var reader = new StreamReader(fileResponse.ResponseStream);
        //do stuff with file...
        // Create a `result` object
        results.Enqueue(result);
    }
    return results;
});