Scala FS2 Optimization

189 views Asked by At

I wrote a little data processing pipeline with FS2 (my first try). It works but is pretty clunky. The input data are flight records and the output is a Map. A recommendation would be great (Besides creating new data types so that I don't have to sum tuples). Also any recommendations on performance optimization?

This is my code below

import cats.effect.{IO, IOApp}
import fs2.{Stream, text}
import fs2.io.file.{Files, Path}
import Utils._
import cats.Semigroup
import cats.syntax.all._
import cats.instances.map._
import cats.instances.vector._
import fs2.Chunk

object FS2Implementation extends IOApp.Simple {

case class SpeedRow(
    recordDate: Date,
    origin: String,
    destination: String,
    distance: Option[Double],
    airtime: Option[Double]
)

  val file = "src/main/FlightData/2018.csv"

  def stringToRows(string: String): SpeedRow = {
    val result = castToSpeedRow(string.split(",").map(_.trim).toVector)
    result
  }

  def getFileStream(path: String): Stream[IO, SpeedRow] =
    val filePath = Path(path)
    Files[IO].readUtf8Lines(filePath).drop(1).dropLast.map(stringToRows)

  def processStream(stream: Stream[IO, SpeedRow]) = {
    def createMapFromRow(
        row: SpeedRow
    ): Map[(String, String), Vector[(Double, Double)]] = Map(
      (row.origin, row.destination) -> Vector(
        (row.airtime.get, row.distance.get)
      )
    )
    val filteredStream =
      stream.filter(p => p.airtime != None && p.distance != None)
    val groupedBy = filteredStream
      .chunkN(10000, true)
      .map(c =>
        c.foldLeft(Map[(String, String), Vector[(Double, Double)]]())(
          (m, row) => m |+| createMapFromRow(row)
        )
      )
    groupedBy
  }

  override def run: IO[Unit] =
    val emptyMap = Map[(String, String), Vector[(Double, Double)]]()
    val vectorOfMaps =
      processStream(getFileStream(file)).compile.toVector
    val elem2 = for
      elem <- vectorOfMaps
      map <- IO(elem.foldLeft(emptyMap)(_ |+| _))
      mapValues <- IO(
        map.mapValues(w =>
          w.foldLeft((0.0, 0.0))((l, r) => (l._1 + r._1, l._2 + r._2))
        )
      )
    yield println(mapValues.toMap.size)
    elem2

}


def castToSpeedRow(row: Vector[String]): SpeedRow =
  val format = SimpleDateFormat("yyyy-MM-dd")
  val date = format.parse(row(0))
  val origin = row(3)
  val destination = row(4)
  val airTime = row(20).toDoubleOption
  val distance = row(21).toDoubleOption
  SpeedRow(date, origin, destination, distance, airTime)`
0

There are 0 answers