Spark is a general-purpose distributed data processing engine that is used for for variety of big data use cases – e.g. analysis of logs and event data for security, fraud detection and intrusion detection. It has the notion of Resilient Distributed Datasets. The “resilience” has to do with lineage of a datastructure, not to replication. Lineage means the set of operators applied to the original datastructure. Lineage and metadata are used to recover lost data, in case of node failures, using recomputation.
Spark word count example discussed in today’s meetup.
val textfile = sc.textFile("obama.txt")
val counts = textFile.flatMap(line=>line.split(" ")).filter(_.length>4).map(word=>(word,1)).reduceByKey(_+_)
val sortedCounts = counts.map(_.swap).sortByKey(false)
sortedCounts.take(10)
Scala is a functional programming language which is used in Spark. It prefers immutable datastructures. Sounds great! How are state changes done then ? Through function calls. Recursion has a bigger role to play because it is a way for state changes to happen via function calls. The stack is utilized for the writes, rather than the heap. I recalled seeing a spiral scala program earlier and found one here on the web. Modified it to find the reverse spiral. Here’s the resulting code. The takeaway is that functional programs are structured differently – one could do some things more naturally. It feels closer to how the mind works. As long as one get the base cases right, one can build large amount of complexity trivially. On the other hand, if one has to start top down and must debug a large call stack, it could be challenging.
// rt annotated spiral program.
// source http://www.kaiyin.co.vu/2015/10/draw-plain-text-spiral-in-scala.html
// reference: http://www.cis.upenn.edu/~matuszek/Concise%20Guides/Concise%20Scala.html
// syntax highlight: http://bsnyderblog.blogspot.com/2012/12/vim-syntax-highlighting-for-scala-bash.html
import java.io.{File, PrintWriter}
import scala.io.Source
object SpiralObj { // object keyword => a singleton object of a class defined implicitly by the same name
object Element { // subclass. how is element a singleton ? there are several elements. has 3 subclasses which are not singetons
private class ArrayElement( // subsubclass, not a singleton
val contents: Array[String] // "primary constructor" is defined in class declaration, must be called
) extends Element
private class LineElement(s: String) extends Element {
val contents = Array(s)
}
private class UniformElement( // height and width of a line segment. what if we raise width to 2. works.
ch: Char,
override val width: Int, // override keyword is required to override an inherited method
override val height: Int
) extends Element {
private val line = ch.toString * width // fills the characters in a line
def contents = Array.fill(height)(line) // duplicates line n(=height) times, to create a width*height rectangle
}
// three constructor like methods
def elem(contents: Array[String]): Element = {
new ArrayElement(contents)
}
def elem(s: String): Element = {
new ArrayElement(Array(s))
}
def elem(chr: Char, width: Int, height: Int): Element = {
new UniformElement(chr, width, height)
}
}
abstract class Element {
import Element.elem
// contents to be implemented
def contents: Array[String]
def width: Int = contents(0).length
def height: Int = contents.length
// prepend this to that, so it appears above
def above(that: Element): Element = { // above uses widen
val this1 = this widen that.width
val that1 = that widen this.width
elem(this1.contents ++ that1.contents)
}
// prefix new bar line by line
def beside(that: Element): Element = { // beside uses heighten
val this1 = this heighten that.height
val that1 = that heighten this.height
elem(
for ((line1, line2) <- this1.contents zip that1.contents)
yield line1 + line2
)
}
// add padding above and below
def heighten(h: Int): Element = { // heighten uses above
if (h <= height) this
else {
val top = elem(' ', width, (h - height) / 2)
val bottom = elem(' ', width, h - height - top.height)
top above this above bottom
}
}
// add padding left and right
def widen(w: Int): Element = { // widen uses beside
if (w <= width) this
else {
val left = elem(' ', (w - width) / 2, height)
val right = elem(' ', w - width - left.width, height)
left beside this beside right
}
}
override def toString = contents mkString "\n"
}
object Spiral {
import Element._
val space = elem("*")
val corner1 = elem("/")
val corner2 = elem("\\")
def spiral(nEdges: Int, direction: Int): Element = { // clockwise spiral
if(nEdges == 0) elem("+")
else {
//val sp = spiral(nEdges - 1, (direction + 1) % 4) // or (direction - 1) % 4, but we don't want negative numbers
val sp = spiral(nEdges - 1, (direction + 3) % 4) // or (direction - 1) % 4, but we don't want negative numbers
var verticalBar = elem('|', 1, sp.height) // vertBar and horizBar have last two params order switched
var horizontalBar = elem('-', sp.width, 1)
val thick = 1
// at this stage, assume the n-1th spiral exists and you are adding another "line" to it (not a whole round)
// use "above" and "beside" operators to attach the line to the spiral
if(direction == 0) {
horizontalBar = elem('r', sp.width, thick)
(corner1 beside horizontalBar) above (sp beside space) // order is left to right
}else if(direction == 1) {
verticalBar = elem('d',thick, sp.height)
(sp above space) beside (corner2 above verticalBar)
} else if(direction == 2) {
horizontalBar = elem('l', sp.width, thick)
(space beside sp) above (horizontalBar beside corner1)
} else {
verticalBar = elem('u',thick, sp.height)
(verticalBar above corner2) beside (space above sp)
}
}
}
def revspiral(nEdges: Int, direction: Int): Element = { // try counterclockwise
if(nEdges == 0) elem("+")
else {
//val sp = spiral(nEdges - 1, (direction + 1) % 4) // or (direction - 1) % 4, but we don't want negative numbers
val sp = revspiral(nEdges - 1, (direction + 3) % 4) // or (direction - 1) % 4, but we don't want negative numbers
var verticalBar = elem('|', 1, sp.height) // vertBar and horizBar have last two params order switched
var horizontalBar = elem('-', sp.width, 1)
val thick = 1
// at this stage, assume the n-1th spiral exists and you are adding another "line" to it (not a whole round)
if(direction == 0) { // right
horizontalBar = elem('r', sp.width, thick)
(sp beside space) above (corner2 beside horizontalBar)
}else if(direction == 1) { // up
verticalBar = elem('u',thick, sp.height)
(space above sp) beside (verticalBar above corner1)
} else if(direction == 2) { // left
horizontalBar = elem('l', sp.width, thick)
(horizontalBar beside corner2 ) above (space beside sp)
} else { // down
verticalBar = elem('d',thick, sp.height)
(corner1 above verticalBar) beside (sp above space)
}
}
}
def draw(n: Int): Unit = {
println()
println(spiral(n, n % 4)) // %4 returns 0,1,2,3 . right, down, left, up
println()
println(revspiral(n, n % 4)) // %4 returns 0,1,2,3
}
}
}
object Main {
def usage() {
print("usage: scala Main szInt");
}
def main(args: Array[String]) {
import SpiralObj._
if(args.length > 0) {
val spsize = args(0)
Spiral.draw(spsize.toInt)
} else {
usage()
println()
}
}
}
A note on tail-call recursion. If the last statement of function is a call to another function, then the return position of the called function is the same as that of the calling function. The current stack position is valid for the called function. Such a function is tail recursive and the effect is that of a loop – a series of function calls can be made without consuming stack space.