Spark is a general-purpose distributed data processing engine that is used for for variety of big data use cases – e.g. analysis of logs and event data for security, fraud detection and intrusion detection. It has the notion of Resilient Distributed Datasets. The “resilience” has to do with lineage of a datastructure, not to replication. Lineage means the set of operators applied to the original datastructure. Lineage and metadata are used to recover lost data, in case of node failures, using recomputation.
Spark word count example discussed in today’s meetup.
val textfile = sc.textFile("obama.txt") val counts = textFile.flatMap(line=>line.split(" ")).filter(_.length>4).map(word=>(word,1)).reduceByKey(_+_) val sortedCounts = counts.map(_.swap).sortByKey(false) sortedCounts.take(10)
Scala is a functional programming language which is used in Spark. It prefers immutable datastructures. Sounds great! How are state changes done then ? Through function calls. Recursion has a bigger role to play because it is a way for state changes to happen via function calls. The stack is utilized for the writes, rather than the heap. I recalled seeing a spiral scala program earlier and found one here on the web. Modified it to find the reverse spiral. Here’s the resulting code. The takeaway is that functional programs are structured differently – one could do some things more naturally. It feels closer to how the mind works. As long as one get the base cases right, one can build large amount of complexity trivially. On the other hand, if one has to start top down and must debug a large call stack, it could be challenging.
// rt annotated spiral program. // source http://www.kaiyin.co.vu/2015/10/draw-plain-text-spiral-in-scala.html // reference: http://www.cis.upenn.edu/~matuszek/Concise%20Guides/Concise%20Scala.html // syntax highlight: http://bsnyderblog.blogspot.com/2012/12/vim-syntax-highlighting-for-scala-bash.html import java.io.{File, PrintWriter} import scala.io.Source object SpiralObj { // object keyword => a singleton object of a class defined implicitly by the same name object Element { // subclass. how is element a singleton ? there are several elements. has 3 subclasses which are not singetons private class ArrayElement( // subsubclass, not a singleton val contents: Array[String] // "primary constructor" is defined in class declaration, must be called ) extends Element private class LineElement(s: String) extends Element { val contents = Array(s) } private class UniformElement( // height and width of a line segment. what if we raise width to 2. works. ch: Char, override val width: Int, // override keyword is required to override an inherited method override val height: Int ) extends Element { private val line = ch.toString * width // fills the characters in a line def contents = Array.fill(height)(line) // duplicates line n(=height) times, to create a width*height rectangle } // three constructor like methods def elem(contents: Array[String]): Element = { new ArrayElement(contents) } def elem(s: String): Element = { new ArrayElement(Array(s)) } def elem(chr: Char, width: Int, height: Int): Element = { new UniformElement(chr, width, height) } } abstract class Element { import Element.elem // contents to be implemented def contents: Array[String] def width: Int = contents(0).length def height: Int = contents.length // prepend this to that, so it appears above def above(that: Element): Element = { // above uses widen val this1 = this widen that.width val that1 = that widen this.width elem(this1.contents ++ that1.contents) } // prefix new bar line by line def beside(that: Element): Element = { // beside uses heighten val this1 = this heighten that.height val that1 = that heighten this.height elem( for ((line1, line2) <- this1.contents zip that1.contents) yield line1 + line2 ) } // add padding above and below def heighten(h: Int): Element = { // heighten uses above if (h <= height) this else { val top = elem(' ', width, (h - height) / 2) val bottom = elem(' ', width, h - height - top.height) top above this above bottom } } // add padding left and right def widen(w: Int): Element = { // widen uses beside if (w <= width) this else { val left = elem(' ', (w - width) / 2, height) val right = elem(' ', w - width - left.width, height) left beside this beside right } } override def toString = contents mkString "\n" } object Spiral { import Element._ val space = elem("*") val corner1 = elem("/") val corner2 = elem("\\") def spiral(nEdges: Int, direction: Int): Element = { // clockwise spiral if(nEdges == 0) elem("+") else { //val sp = spiral(nEdges - 1, (direction + 1) % 4) // or (direction - 1) % 4, but we don't want negative numbers val sp = spiral(nEdges - 1, (direction + 3) % 4) // or (direction - 1) % 4, but we don't want negative numbers var verticalBar = elem('|', 1, sp.height) // vertBar and horizBar have last two params order switched var horizontalBar = elem('-', sp.width, 1) val thick = 1 // at this stage, assume the n-1th spiral exists and you are adding another "line" to it (not a whole round) // use "above" and "beside" operators to attach the line to the spiral if(direction == 0) { horizontalBar = elem('r', sp.width, thick) (corner1 beside horizontalBar) above (sp beside space) // order is left to right }else if(direction == 1) { verticalBar = elem('d',thick, sp.height) (sp above space) beside (corner2 above verticalBar) } else if(direction == 2) { horizontalBar = elem('l', sp.width, thick) (space beside sp) above (horizontalBar beside corner1) } else { verticalBar = elem('u',thick, sp.height) (verticalBar above corner2) beside (space above sp) } } } def revspiral(nEdges: Int, direction: Int): Element = { // try counterclockwise if(nEdges == 0) elem("+") else { //val sp = spiral(nEdges - 1, (direction + 1) % 4) // or (direction - 1) % 4, but we don't want negative numbers val sp = revspiral(nEdges - 1, (direction + 3) % 4) // or (direction - 1) % 4, but we don't want negative numbers var verticalBar = elem('|', 1, sp.height) // vertBar and horizBar have last two params order switched var horizontalBar = elem('-', sp.width, 1) val thick = 1 // at this stage, assume the n-1th spiral exists and you are adding another "line" to it (not a whole round) if(direction == 0) { // right horizontalBar = elem('r', sp.width, thick) (sp beside space) above (corner2 beside horizontalBar) }else if(direction == 1) { // up verticalBar = elem('u',thick, sp.height) (space above sp) beside (verticalBar above corner1) } else if(direction == 2) { // left horizontalBar = elem('l', sp.width, thick) (horizontalBar beside corner2 ) above (space beside sp) } else { // down verticalBar = elem('d',thick, sp.height) (corner1 above verticalBar) beside (sp above space) } } } def draw(n: Int): Unit = { println() println(spiral(n, n % 4)) // %4 returns 0,1,2,3 . right, down, left, up println() println(revspiral(n, n % 4)) // %4 returns 0,1,2,3 } } } object Main { def usage() { print("usage: scala Main szInt"); } def main(args: Array[String]) { import SpiralObj._ if(args.length > 0) { val spsize = args(0) Spiral.draw(spsize.toInt) } else { usage() println() } } }
A note on tail-call recursion. If the last statement of function is a call to another function, then the return position of the called function is the same as that of the calling function. The current stack position is valid for the called function. Such a function is tail recursive and the effect is that of a loop – a series of function calls can be made without consuming stack space.