Creating ZipArchive from grpc connection

61 views Asked by At

I have troubles with creating ZipArchive from grpc connection. Schema: i have service with zip creating function. I got some data from third party service, generated csv file based on this data and put it into zip archive. After that, i got memorystream (which is parent for zip), read it till the end, got strings, send it to another service and clean it. In my case this is necessary because there is huge possibility that data from third party services will be huge, so i need to save opportunity for chunk sending. After closing zip with using, i send eocd bytes.

On the other side, i got those strings, convert them to byte arrays, union it and trying to recreate zip file.

ZipArchive successfully created, but there is no entries. I got an error "Error executing Write request: Number of entries expected in End Of Central Directory does not correspond to number of entries in Central Directory." After exploring entrails of ZipArchive class, i found that central directory expected one file, but i got zero.

Number of bytes in both service (sender\receiver) is equal. And there is 100% zip, because byte headers are corresponding with zip archive specs. I have no idea what is wrong.
Code is below.

Service Sender

// just call for destination
    using var call = await _destinationService.InitCall();

    using (var stream = new MemoryStream())
    {
        using (var zip = new ZipArchive(stream, ZipArchiveMode.Create, true))
        {
            var entry = zip.CreateEntry($"{Model.FileName}.{Model.Format}");
            using var writer = new StreamWriter(entry.Open(), Model.Encoding);
            var csvConfiguration = new CsvConfiguration(CultureInfo.InvariantCulture)
            {
                Delimiter = Model.Delimiter
            };
            using var csv = new CsvWriter(writer, csvConfiguration);

            var columns = await _exportService.GetAndWriteColumns(csv, firstChunk, Model, cancellationToken);
            // initalChunk to memoryStream
            await _exportService.PerformExport();

            while (_substreamData.TryDequeue(out var data) || !_isSubstreamEof)
            {
                // next chunks
                await _exportService.PerformExport();
            }
        }
        // sending eocd bytes after closing zip
        await _writeDataService.WriteDataPartAsync(call, stream, cancellationToken);
        await call.RequestStream.CompleteAsync();
    } 

    // PerformingExport 
    public async Task PerformExport(CsvWriter csv, StreamWriter writer)
    {
    // generating csv, getting data from third party services
    await csv.FlushAsync();
                await writer.FlushAsync();
    
                await WriteDataPartAsync(call, stream, cancellationToken);
                stream.Clear();
    }
    public async Task WriteDataPartAsync(
        AsyncDuplexStreamingCall<WriteData, WriteResponse> call,
        Stream stream,
        CancellationToken cancellationToken)
    {
        stream.Seek(0, SeekOrigin.Begin);
        using (var streamReader = new StreamReader(stream, leaveOpen:true))
        {
            var streamString = await streamReader.ReadToEndAsync(cancellationToken);

            await call.RequestStream.WriteAsync(
                new WriteData
                {
                    DataPart = new WriteChunk
                    {
                        Data = streamString
                    }
                },
                cancellationToken);
        }
    }
    
    public static void Clear(this MemoryStream source)
    {
        var buffer = source.GetBuffer();
        Array.Clear(buffer, 0, buffer.Length);
        source.Position = 0;
        source.SetLength(0);
    }

Service receiver

var bytes = new List<byte[]>();
while (await requestStream.MoveNext(context.CancellationToken))
{
    try
    {
        var data = Encoding.UTF8.GetBytes(requestStream.Current.DataPart.Data);
        bytes.Add(data);
    }
    catch (Exception ex)
    {
        _logger.LogError($"Error executing Write request: {ex.Message}");
        errors.Add(ex.Message);
    }
}

var stream1 = new MemoryStream(bytes.SelectMany(x => x).ToArray());
using var zipArchiveSubexport = new ZipArchive(stream1); // success
var t = zipArchiveSubexport.Entries.First(); // error because of number if entries

What am i doing wrong?

2

There are 2 answers

0
Charlieface On BEST ANSWER

Your issue here is that you are reading it as if it's encoded as a UTF8 string, which isn't going to work as many bytes are not compatible with UTF8.

You need to pass bytes directly

public async Task WriteDataPartAsync(
    AsyncDuplexStreamingCall<WriteData, WriteResponse> call,
    Stream stream,
    CancellationToken cancellationToken)
{
    await call.RequestStream.WriteAsync(
            new WriteData
            {
                DataPart = new WriteChunk
                {
                    Data = stream.ToArray(),
                }
            },
            cancellationToken);
}

And on the other side:

var stream1 = new MemoryStream(requestStream.Current.DataPart.Data);
using var zipArchiveSubexport = new ZipArchive(stream1);
var t = zipArchiveSubexport.Entries.First();

If you can't change the format of the packet, you will have to use Base64 encoding instead.

public async Task WriteDataPartAsync(
    AsyncDuplexStreamingCall<WriteData, WriteResponse> call,
    Stream stream,
    CancellationToken cancellationToken)
{
    await call.RequestStream.WriteAsync(
            new WriteData
            {
                DataPart = new WriteChunk
                {
                    Data = Convert.ToBase64String(stream.GetBuffer(), 0, stream.Length),
                }
            },
            cancellationToken);
}

And on the other side:

var stream1 = new MemoryStream(Convert.FromBase64String(requestStream.Current.DataPart.Data));
using var zipArchiveSubexport = new ZipArchive(stream1);
var t = zipArchiveSubexport.Entries.First();
0
Stanislav Muryndin On

It's all about encoding. UTF8 doesn't fit for this purpose. I rewrite it on base64 string and it works