Were pipelines removed from akka i/o?

521 views Asked by At

While learning how to use akka I/O I am trying to implement a simple protocal on top of akka i/o and was following the documentation here.

However in my gradle file I use version 2.3.9 as shown below

dependencies {
    compile group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.7'
    compile group: 'com.typesafe.akka', name: 'akka-actor_2.11', version: '2.3.9'
    compile group: 'com.typesafe.akka', name: 'akka-contrib_2.11', version: '2.3.9'
    compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.5'
    testCompile group: 'junit', name: 'junit', version: '4.11'
}

import of some things that are pipeline specific like

import akka.io.SymmetricPipelineStage;
import akka.io.PipelineContext;
import akka.io.SymmetricPipePair;

generate can not resolve symbol errors.

Hence my questions.

  1. Were these removed or there is some dependancy I need to add to my gradle file.
  2. If they were removed, how would the encod/decode stage be dealt with?
2

There are 2 answers

5
Eric Zoerner On BEST ANSWER

Pipelines were experimental and indeed removed in Akka 2.3. The removal was documented in the Migration Guide 2.2.x to 2.3.x.

There is also mention of being able to package the "older" pipeline implementation with Akka 2.3 here, though it doesn't appear to be a simple addition of a dependency.

I would wager that Akka Streams is intended to be the better replacement of pipelines, coming in Akka 2.4, but available now as an experimental module. The encode/decode stage or protocol layer can be handled by using Akka Streams in conjunction with Akka I/O.

0
Eugene Zhulkov On

Yes, pipelines were removed without any alternatives. I came from Netty world and don't find pipelines "unintuitive" - they accumulate buffers and supply children actors with ready to use messages.

Take a look at our solutions, it requires "org.scalaz" %% "scalaz-core" % 7.2.14 as a dependency.

Codec class is a State monad which is being called by the actor and produces output. In our projects we are using Varint32 protobuf encoding, so every message is prepended with varint32 length field:

import com.google.protobuf.CodedInputStream
import com.trueaccord.scalapb.{GeneratedMessage, GeneratedMessageCompanion, Message}
import com.zeptolab.tlc.front.codecs.Varint32ProtoCodec.ProtoMessage

import scalaz.{-\/, State, \/, \/-}

trait Accumulator
trait Codec[IN, OUT] {

  type Stream = State[Accumulator, Seq[IN]]

  def decode(buffer: Array[Byte]): Throwable \/ IN

  def encode(message: OUT): Array[Byte]

  def emptyAcc: Accumulator

  def decodeStream(data: Array[Byte]): Stream

}

object Varint32ProtoCodec {

  type ProtoMessage[T] = GeneratedMessage with Message[T]

  def apply[IN <: ProtoMessage[IN], OUT <: ProtoMessage[OUT]](protoType: GeneratedMessageCompanion[IN]) = new Varint32ProtoCodec[IN, OUT](protoType)

}

class Varint32ProtoCodec[IN <: ProtoMessage[IN], OUT <: ProtoMessage[OUT]](protoType: GeneratedMessageCompanion[IN]) extends Codec[IN, OUT] {

  import com.google.protobuf.CodedOutputStream

  private case class AccumulatorImpl(expected: Int = -1, buffer: Array[Byte] = Array.empty) extends Accumulator

  override def emptyAcc: Accumulator = AccumulatorImpl()

  override def decode(buffer: Array[Byte]): Throwable \/ IN = {
    \/.fromTryCatchNonFatal {
      val dataLength = CodedInputStream.newInstance(buffer).readRawVarint32()
      val bufferLength = buffer.length
      val dataBuffer = buffer.drop(bufferLength - dataLength)
      protoType.parseFrom(dataBuffer)
    }
  }

  override def encode(message: OUT): Array[Byte] = {
    val messageBuf = message.toByteArray
    val messageBufLength = messageBuf.length
    val prependLength = CodedOutputStream.computeUInt32SizeNoTag(messageBufLength)
    val prependLengthBuffer = new Array[Byte](prependLength)
    CodedOutputStream.newInstance(prependLengthBuffer).writeUInt32NoTag(messageBufLength)
    prependLengthBuffer ++ messageBuf
  }

  override def decodeStream(data: Array[Byte]): Stream = State {
    case acc: AccumulatorImpl =>
      if (data.isEmpty) {
        (acc, Seq.empty)
      } else {
        val accBuffer = acc.buffer ++ data
        val accExpected = readExpectedLength(accBuffer, acc)
        if (accBuffer.length >= accExpected) {
          val (frameBuffer, restBuffer) = accBuffer.splitAt(accExpected)
          val output = decode(frameBuffer) match {
            case \/-(proto) => Seq(proto)
            case -\/(_) => Seq.empty
          }
          val (newAcc, recOutput) = decodeStream(restBuffer).run(emptyAcc)
          (newAcc, output ++ recOutput)
        } else (AccumulatorImpl(accExpected, accBuffer), Seq.empty)
      }
    case _ => (emptyAcc, Seq.empty)
  }

  private def readExpectedLength(data: Array[Byte], acc: AccumulatorImpl) = {
    if (acc.expected == -1 && data.length >= 1) {
      \/.fromTryCatchNonFatal {
        val is = CodedInputStream.newInstance(data)
        val dataLength = is.readRawVarint32()
        val tagLength = is.getTotalBytesRead
        dataLength + tagLength
      }.getOrElse(acc.expected)
    } else acc.expected
  }

}

And the Actor is:

import akka.actor.{Actor, ActorRef, Props}
import akka.event.Logging
import akka.util.ByteString
import com.zeptolab.tlc.front.codecs.{Accumulator, Varint32ProtoCodec}
import com.zeptolab.tlc.proto.protocol.{Downstream, Upstream}

object FrameCodec {
  def props() = Props[FrameCodec]
}

class FrameCodec extends Actor {

  import akka.io.Tcp._

  private val logger       = Logging(context.system, this)
  private val codec        = Varint32ProtoCodec[Upstream, Downstream](Upstream)
  private val sessionActor = context.actorOf(Session.props())

  def receive = {
    case r: Received =>
      context become stream(sender(), codec.emptyAcc)
      self ! r
    case PeerClosed => peerClosed()
  }

  private def stream(ioActor: ActorRef, acc: Accumulator): Receive = {
    case Received(data) =>
      val (next, output) = codec.decodeStream(data.toArray).run(acc)
      output.foreach { up =>
        sessionActor ! up
      }
      context become stream(ioActor, next)
    case d: Downstream =>
      val buffer = codec.encode(d)
      ioActor ! Write(ByteString(buffer))
    case PeerClosed => peerClosed()
  }

  private def peerClosed() = {
    logger.info("Connection closed")
    context stop self
  }

}