Spark and Scala

Spark is a general-purpose distributed data processing engine that is used for for variety of big data use cases – e.g. analysis of logs and event data for security, fraud detection and intrusion detection. It has the notion of Resilient Distributed Datasets. The “resilience” has to do with lineage of a datastructure, not to replication. Lineage means the set of operators applied to the original datastructure. Lineage and metadata are used to recover lost data, in case of node failures, using recomputation.

Spark word count example discussed in today’s meetup.

val textfile = sc.textFile("obama.txt")
val counts = textFile.flatMap(line=>line.split(" ")).filter(_.length>4).map(word=>(word,1)).reduceByKey(_+_)
val sortedCounts = counts.map(_.swap).sortByKey(false)
sortedCounts.take(10)

Scala is a functional programming language which is used in Spark. It prefers immutable datastructures. Sounds great! How are state changes done then ? Through function calls. Recursion has a bigger role to play because it is a way for state changes to happen via function calls. The stack is utilized for the writes, rather than the heap. I recalled seeing a spiral scala program earlier and found one here on the web. Modified it to find the reverse spiral. Here’s the resulting code. The takeaway is that functional programs are structured differently – one could do some things more naturally. It feels closer to how the mind works. As long as one get the base cases right, one can build large amount of complexity trivially. On the other hand, if one has to start top down and must debug a large call stack, it could be challenging.

// rt annotated spiral program.
// source http://www.kaiyin.co.vu/2015/10/draw-plain-text-spiral-in-scala.html
// reference: http://www.cis.upenn.edu/~matuszek/Concise%20Guides/Concise%20Scala.html
// syntax highlight: http://bsnyderblog.blogspot.com/2012/12/vim-syntax-highlighting-for-scala-bash.html
import java.io.{File, PrintWriter}
import scala.io.Source

object SpiralObj {   // object keyword => a singleton object of a class defined implicitly by the same name
  object Element {   // subclass. how is element a singleton ? there are several elements. has 3 subclasses which are not singetons
    private class ArrayElement(  // subsubclass, not a singleton
                                val contents: Array[String]  // "primary constructor" is defined in class declaration, must be called
                                ) extends Element
    private class LineElement(s: String) extends Element {
      val contents = Array(s)
    }
    private class UniformElement(  // height and width of a line segment. what if we raise width to 2. works.
                                  ch: Char,
                                  override val width: Int,   // override keyword is required to override an inherited method
                                  override val height: Int
                                  ) extends Element {
      private val line = ch.toString * width  // fills the characters in a line
      def contents = Array.fill(height)(line) // duplicates line n(=height) times, to create a width*height rectangle
    }
    // three constructor like methods
    def elem(contents: Array[String]): Element = {
      new ArrayElement(contents)
    }
    def elem(s: String): Element = {
      new ArrayElement(Array(s))
    }
    def elem(chr: Char, width: Int, height: Int): Element = {
      new UniformElement(chr, width, height)
    }
  }


  abstract class Element {
    import Element.elem
    // contents to be implemented
    def contents: Array[String]

    def width: Int = contents(0).length

    def height: Int = contents.length

    // prepend this to that, so it appears above
    def above(that: Element): Element = {      // above uses widen
      val this1 = this widen that.width
      val that1 = that widen this.width
      elem(this1.contents ++ that1.contents)
    }

    // prefix new bar line by line
    def beside(that: Element): Element = {     // beside uses heighten
      val this1 = this heighten that.height
      val that1 = that heighten this.height
      elem(
        for ((line1, line2) <- this1.contents zip that1.contents)
          yield line1 + line2
      )
    }

    // add padding above and below
    def heighten(h: Int): Element = {          // heighten uses above
      if (h <= height) this
      else {
        val top = elem(' ', width, (h - height) / 2)
        val bottom = elem(' ', width, h - height - top.height)
        top above this above bottom
      }
    }

    // add padding left and right
    def widen(w: Int): Element = {             // widen uses beside
      if (w <= width) this
      else {
        val left = elem(' ', (w - width) / 2, height)
        val right = elem(' ', w - width - left.width, height)
        left beside this beside right
      }
    }

    override def toString = contents mkString "\n"
  }


  object Spiral {
    import Element._
    val space = elem("*")
    val corner1 = elem("/")
    val corner2 = elem("\\")
    def spiral(nEdges: Int, direction: Int): Element = { // clockwise spiral
      if(nEdges == 0) elem("+")
      else {
        //val sp = spiral(nEdges - 1, (direction + 1) % 4) // or (direction - 1) % 4, but we don't want negative numbers
        val sp = spiral(nEdges - 1, (direction + 3) % 4) // or (direction - 1) % 4, but we don't want negative numbers
        var verticalBar = elem('|', 1, sp.height)        // vertBar and horizBar have last two params order switched
        var horizontalBar = elem('-', sp.width, 1)
    val thick = 1
        // at this stage, assume the n-1th spiral exists and you are adding another "line" to it (not a whole round)
        // use "above" and "beside" operators to attach the line to the spiral
        if(direction == 0) {
          horizontalBar = elem('r', sp.width, thick)
          (corner1 beside horizontalBar) above (sp beside space) //  order is left to right
        }else if(direction == 1) {
          verticalBar = elem('d',thick, sp.height)
          (sp above space) beside (corner2 above verticalBar)
        } else if(direction == 2) {
          horizontalBar = elem('l', sp.width, thick)
          (space beside sp) above (horizontalBar beside corner1)
        } else {
          verticalBar = elem('u',thick, sp.height)
          (verticalBar above corner2) beside (space above sp)
        }
      }
    }

    def revspiral(nEdges: Int, direction: Int): Element = { // try counterclockwise
      if(nEdges == 0) elem("+")
      else {
        //val sp = spiral(nEdges - 1, (direction + 1) % 4) // or (direction - 1) % 4, but we don't want negative numbers
        val sp = revspiral(nEdges - 1, (direction + 3) % 4) // or (direction - 1) % 4, but we don't want negative numbers
        var verticalBar = elem('|', 1, sp.height)        // vertBar and horizBar have last two params order switched
        var horizontalBar = elem('-', sp.width, 1)
    val thick = 1
        // at this stage, assume the n-1th spiral exists and you are adding another "line" to it (not a whole round)
        if(direction == 0) { // right
          horizontalBar = elem('r', sp.width, thick)
          (sp beside space) above (corner2 beside horizontalBar)
        }else if(direction == 1) { // up
          verticalBar = elem('u',thick, sp.height)
          (space above sp) beside (verticalBar above corner1)
        } else if(direction == 2) { // left
          horizontalBar = elem('l', sp.width, thick)
          (horizontalBar beside corner2 ) above (space beside sp) 
        } else { // down
          verticalBar = elem('d',thick, sp.height)
          (corner1 above verticalBar) beside (sp above space)
        }
      }
    }
    def draw(n: Int): Unit = {
      println()
      println(spiral(n, n % 4))  // %4 returns 0,1,2,3 .    right, down, left, up
      println()
      println(revspiral(n, n % 4))  // %4 returns 0,1,2,3   
    }
  }
}

object Main {
  def usage() {
      print("usage: scala Main szInt");
  }

  def main(args: Array[String]) {
    
    import SpiralObj._
    if(args.length > 0) {
        val spsize = args(0)
        Spiral.draw(spsize.toInt)
    } else {
    usage()
        println()
    }
  }
}


A note on tail-call recursion. If the last statement of function is a call to another function, then the return position of the called function is the same as that of the calling function. The current stack position is valid for the called function. Such a function is tail recursive and the effect is that of a loop – a series of function calls can be made without consuming stack space.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s