Simple process here, where an S3 bucket triggers a Lambda to convert a CSV file to JSON, upon upload. I cannot predict the size of the incoming CSV files, they could be up to several GB, which is why I'm trying to employ streams all around.
I've got it working but it's heavy on the memory footprint in Lambda. Looking to understand how I can improve this and what my options might be. When testing with a 120MB CSV, it consumes over 950MB in Lambda. Is better possible? Maybe not, since I'm reading the source file line-by-line and not in chunks.
Lambda:
export const ingestionConversionHandler = async (event: S3Event): Promise<boolean> => {
const { bucket, object } = event.Records[0].s3;
try {
let res = await convertFile({
bucketName: bucket.name,
objectKey: object.key
});
return res;
} catch (e) {
console.error("ERROR:", e);
return false;
}
};
...which calls convertFile:
export const convertFile = async (params: ConversionParams): Promise<boolean> => {
try {
//get source file stream
const readStream = await getStreamedFile({ keyName: params.objectKey });
//Transform for CSV-to-JSON conversion
const transformStream = new JsonTransformLineStream({});
//required by S3 Upload object
const passStream = new PassThrough({
readableHighWaterMark: 1000 * 1024 * 1024 //1GB
});
//pass source file stream to read line-by-line
const lineReader = createInterface({
input: readStream,
output: process.stdout,
terminal: false,
crlfDelay: Infinity //handle \r\n as single line break
});
//S3 Upload
const upload = getUpload({
keyName: params.objectKey,
streamBody: passStream
});
return new Promise<boolean>((resolve, reject) => {
//pipe together and run
transformStream.pipe(passStream);
/**
* EVENTS
*/
lineReader.on("line", (line: string) => {
transformStream.write(line);
});
lineReader.on("close", () => {
transformStream.end();
});
readStream.on("end", () => {
console.log("Read stream ended.")
});
readStream.on("error", (error) => {
console.error("Error in read stream:", error)
});
//handle end of stream upload
transformStream.on("end", async () => {
console.log("Transform stream ended.");
//CRITICAL: This triggers and completes the upload stream
await upload.done();
console.log("Stream upload completed...");
resolve(true);
});
transformStream.on("error", (error) => {
console.error("Error in transform stream:", error)
reject(false);
});
});
} catch (e) {
console.error("Error converting file:", e)
throw e;
}
};
I'm guessing the memory consumption comes from setting the passthrough stream's readableHighWaterMark prop so high, but the Lambda bombs without it.
For posterity, here's getStreamedFile:
export const getStreamedFile = async (params: BucketParams): Promise<Readable> => {
try {
const client = new S3Client({});
const command = new GetObjectCommand({
Bucket: process.env.S3_BUCKET,
Key: params.keyName,
});
let res = await client.send(command);
const body = res.Body! as Readable;
return body
} catch (e) {
console.error(e);
throw e;
}
};
...and getUpload:
export const getUpload = (params: UploadParams): Upload => {
try {
let extension = "json";
const file = params.fileName ?? params.keyName;
const [fileName, ext] = file.split(".");
if (params.fileName) {
extension = ext;
}
const uploadKeyName = `${fileName}.${extension}`;
const contentType = getMimeType(uploadKeyName);
const s3params = {
Bucket: process.env.S3_BUCKET,
Key: uploadKeyName,
Body: params.streamBody,
ContentType: contentType
};
const upload = new Upload({
client: new S3Client({}),
queueSize: 4,
params: s3params
});
const defaultProgress = (progress: Progress) => {
console.log(`Uploaded part: ${progress.part}`);
console.log(`...loaded: ${progress.loaded}`);
console.log(`...total: ${progress.total}`);
};
upload.on("httpUploadProgress", params.onUploadProgress || defaultProgress);
return upload;
} catch (e) {
console.error(e);
throw e;
}
};
The transformStream is a basic override of _transform for formatting so I didn't include it.