paulcichonski.com

whatever I have been thinking about lately

Simple Timer Abstraction in Scala

I’ve been digging into Scala a lot more recently and one of the things that I love is the support for first class functions with a syntax that is less verbose than Java (I think it is even better than Java 8’s lambdas). For example, I was working on some test code to measure latency of various network operations. This is not production code so I don’t need anything as powerful as Coda Hale’s Metrics library (which I love for production), I just needed a way to quickly time stuff in code.

In Scala this is as simple creating a function like this:

1
2
3
4
5
6
  def time(execution: () => Unit) {
    val start = System.currentTimeMillis()
    execution()
    val time = System.currentTimeMillis() - start
    println("execution time: " + time + "ms")
  }

This is a higher-order function that will time the execution of whatever function that gets passed to it. If you are coming from Java it should look pretty straight forward, the only major difference is the function argument declaration of execution: () => Unit; this declares the function argument execution as type () => Unit. In Scala the type declaration comes after the variable name and the two are separated by a :. The type declaration in this case defines a function that takes zero arguments (() is syntactic sugar for this) and returns nothing (Unit here is similar to void in Java).

Below are some examples of using this function:

1
2
3
4
5
6
scala> time(() => println("sum of range 0 to 100000000: " + (0 to 100000000).reduce((a, b) => a + b)))
sum of range 0 to 100000000: 987459712
execution time: 3246ms

scala> time(() => Thread.sleep(500))
execution time: 500ms

Writing code like this is only possible thanks to the ability to treat functions as objects. While this won’t revolutionize the way you code it is does allow you to start removing the boiler plate code that tends to build up in Java.

This is only the tip of the iceberg in functional programming, if you want to learn more check out the free Scala by Example book provided by on the Scala website.

Learning Hadoop: WebHdfsFileSystem vs FileSystem

For the last few weeks I’ve had the chance to start digging into the Hadoop ecosystem focusing mainly on Spark for distributed compute (both batch and streaming) as well as HDFS for data storage. The first thing I noticed is how massive the Hadoop ecosystem. The word Hadoop refers to many different technologies that users may deploy in many different ways to solve specific Big Data usecases. So referring to Hadoop is similar to referring to phrases like IT Security in that it very ambiguous until you start digging down into the specifics of the Hadoop deployment.

Enough of the high level speak though, what I really want to talk about is the pain I experienced just trying to get data in and out of HDFS. Most of the pain was self-inflicted as my mental model going into the problem was induced from over a year working with Cassandra, which is a much simpler system for storing data albeit does not provide as good of a foundation for storing raw data in a lambda architecture type design. In Cassandra you have the cluster and you have the client, where the client is your application and it speaks to the cluster over the network in a fairly typical client-server model.

I quickly discovered that my map was not the territory when I started writing some simple code for sending data into HDFS:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package data.generator

import com.twitter.elephantbird.mapreduce.io.ProtobufBlockWriter
import data.generator.DataUtil.TestMessage
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress

import scala.compat.Platform
import scalaz.EphemeralStream

object DataWriterTest {

  def main(args: Array[String]) {
    writeSomeData("hdfs://192.168.50.25:50010", 1000000, "/tmp/test.snappy")
  }

  def writeSomeData(hdfsURI: String, numberOfMessages: Int, destPath: String) {
    def getSnappyWriter(): ProtobufBlockWriter[TestMessage] = {
      val conf = new Configuration()
      val fs = FileSystem.get(new Path(hdfsURI).toUri, conf)
      val outputStream = fs.create(new Path(destPath), true)
      val codec = new compress.SnappyCodec()
      codec.setConf(conf);
      val snappyOutputStream = codec.createOutputStream(outputStream)
      new ProtobufBlockWriter[TestMessage](snappyOutputStream, classOf[TestMessage])
    }
    def getMessage(messageId: Long): TestMessage = DataUtil.createRandomTestMessage(messageId);
    // produces a lazy stream of messages
    def getMessageStream(messageId: Long): EphemeralStream[TestMessage] = {
      val messageStream: EphemeralStream[TestMessage] = getMessage(messageId) ##:: getMessageStream(messageId + 1)
      return messageStream
    }
    val writer = getSnappyWriter()
    getMessageStream(1).takeWhile(message => message.getMessageId < numberOfMessages)
      .foreach(message => writer.write(message))
    writer.finish()
    writer.close()
  }
}

As far as I could tell this pattern for writing was consistent with most of the tutorials I found via Google, the only new thing I added in was the use of Twitter’s elephantbird library to write Snappy compressed Protocol Buffer data. So I was surprised when I saw (and kept seeing) the following errors:

Client Errors:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
Exception in thread "main" java.io.IOException: Failed on local exception: java.io.EOFException; Host Details : local host is: ".local/192.168.1.2"; destination host is: "localhost.localdomain":50010;
  at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:764)
  at org.apache.hadoop.ipc.Client.call(Client.java:1413)
  at org.apache.hadoop.ipc.Client.call(Client.java:1362)
  at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
  at com.sun.proxy.$Proxy9.create(Unknown Source)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)
  at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
  at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
  at com.sun.proxy.$Proxy9.create(Unknown Source)
  at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:258)
  at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1598)
  at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1460)
  at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1385)
  at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:394)
  at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:390)
  at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
  at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:390)
  at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:334)
  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784)
  at data.generator.DataWriterTest$.getSnappyWriter$1(DataWriterTest.scala:23)
  at data.generator.DataWriterTest$.writeSomeData(DataWriterTest.scala:35)
  at data.generator.DataWriterTest$.main(DataWriterTest.scala:16)
  at data.generator.DataWriterTest.main(DataWriterTest.scala)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)
  at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Caused by: java.io.EOFException
  at java.io.DataInputStream.readInt(DataInputStream.java:392)
  at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1053)
  at org.apache.hadoop.ipc.Client$Connection.run(Client.java:948)

Server Errors (hdfs datanode log):

1
2
3
4
5
ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: localhost.localdomain:50010:DataXceiver error processing unknown operation  src: /192.168.50.2:56439 dest: /192.168.50.25:50010
java.io.IOException: Version Mismatch (Expected: 28, Received: 26738 )
  at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.readOp(Receiver.java:57)
  at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:206)
  at java.lang.Thread.run(Thread.java:744)

After about 8 hours of lots of googling and ensuring all client and server jars were the same version my surprise turned to frustration. Especially since the error messages were so vague.

Finally after a few circuits at the gym, I started from scratch and read through all the documentation I could find about typical data loading strategies for HDFS (something I should have done to begin with). This led me to the realization that my client-server mental model was flawed within the HDFS context since HDFS makes no assumption about where the data is being written from (in fact it seems to assume that the client is local to the cluster). Some quick exploration of the org.apache.hadoop.fs.FileSystem class hierarchy showed that there are a variety of different ways for writing to HDFS and only some of them are over TCP/IP. So with a little refactoring to use the org.apache.hadoop.hdfs.web.WebHdfsFileSystem implementation my code works just fine:

Note the new webhdfs:// protocol in the URI and the new port of 50070. There seems to be a tight coupling of protocol to FileSystem implementation as well as port mapping, but I have not found great documentation yet as to what this coupling is.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
 package data.generator

 import java.net.URI

 import com.twitter.elephantbird.mapreduce.io.ProtobufBlockWriter
 import data.generator.DataUtil.TestMessage
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem
 import org.apache.hadoop.io.compress

 import scala.compat.Platform
 import scalaz.EphemeralStream

 object DataWriterTest {

   def main(args: Array[String]) {
     writeSomeData("webhdfs://192.168.50.25:50070", 1000000, "/tmp/test2.snappy")
   }

   def writeSomeData(hdfsURI: String, numberOfMessages: Int, destPath: String) {
     def getSnappyWriter(): ProtobufBlockWriter[TestMessage] = {
       val conf = new Configuration()
       val fs = new WebHdfsFileSystem();
       fs.initialize(new Path(hdfsURI).toUri, conf)
       val outputStream = fs.create(new Path(destPath), true)
       val codec = new compress.SnappyCodec()
       codec.setConf(conf);
       val snappyOutputStream = codec.createOutputStream(outputStream)
       new ProtobufBlockWriter[TestMessage](snappyOutputStream, classOf[TestMessage])
     }
     def getMessage(messageId: Long): TestMessage = DataUtil.createRandomTestMessage(messageId);
     // produces a lazy stream of messages
     def getMessageStream(messageId: Long): EphemeralStream[TestMessage] = {
       val messageStream: EphemeralStream[TestMessage] = getMessage(messageId) ##:: getMessageStream(messageId + 1)
       return messageStream
     }
     val writer = getSnappyWriter()
     getMessageStream(1).takeWhile(message => message.getMessageId < numberOfMessages)
       .foreach(message => writer.write(message))
     writer.finish()
     writer.close()
   }
 }

File in hdfs:

1
2
3
$ hdfs dfs -ls -h /tmp
Found 1 item
-rw-r--r--   3 pcichonski supergroup    215.9 M 2014-07-19 08:44 /tmp/test2.snappy

I’m still not sure this is the write method for writing large amounts of data to HDFS, but more dots are starting to connect in my head about how the component parts of the ecosystem fit together. Lots more to learn though.

Meetup Talks

I had the opportunity to speak at two Meetups in January on the work I’ve been doing for the past few months at Lithium. The first talk was at a new Meetup that Lithium started called Cloudops that is oriented towards engineers working devops roles in cloud environments. My talk was a short 15 minute talk (slides here) covering strategies for building fault tolerant systems in cloud environments and my thoughts on how to monitor those systems. My favorite part of the talk was introducing Boyd’s OODA loop as a process model for devops work. I’ve dealt a lot with the OODA loop in my past role at NIST when we were trying to improve the process for IT security incident handling in the industry. I still think OODA is one of the better process models out there since it is abstract enough to remain relevant across many disciplines while still offering value in each individual domain that binds to it.

My second talk was at the Datastax Cassandra SF Meetup hosted at Disqus. This talk (slides here) was a bit more low-level and focused on how we have been using Cassandra at Lithium for the past six months as we move to a more service-oriented architecture internally. This talk was primarily focused on our use case, data model, and all of the issues we dealt with getting Cassandra into production. I also covered the strategy we used for migrating data from MySQL to Cassandra with zero downtime. Our migration strategy was heavily influenced by a Netflix blog post covering a migration from SimpleDB to Cassandra.

Setting Up Octopress on a New Machine

As soon as I got this blog up and running, the first thing I did was to completely screw up my Octopress install (apparently I did need all those files I deleted), thus rendering the entire site useless. As with anything “tinkering” related, this is fairly normal so it is also fairly normal to rebuild from scratch. Luckily, smart people have already figured out how to do this.

The following steps outline how to setup a fresh Octopress install that connects to an existing Github Pages repository (this builds on the zerosharp post with some updates based on recent changes in Octopress). These steps assume that the Github repository is fully up to date with all latest changes

First you need to make sure that your source directory actually contains the source/_posts and the stylesheets folder. You also need to make sure .gitignore is not ignoring any of these (if there is a reason these should not be committed please let me know, I could not think of any).

1
2
3
4
5
6
## This needs to happen on your first machine, for some reason they are ignored by default.
cd source/
git add _posts/ 
git add stylesheets
git commit -m "adding posts and stylesheets dir"
git push origin source

The remainder of the steps happen on your second machine. First clone the source branch

1
2
git clone -b source git@github.com:username/username.github.io.git octopress
cd octopress

Next, install Octopress

1
2
3
4
gem install bundler
rbenv rehash    # If you use rbenv, rehash to be able to run the bundle command
bundle install
rake setup_github_pages

The last command deleted the _deploy directory and re-added it. We don’t want this because we want the latest changes from _deploy so we don’t run into any nasty [rejected] master -> master (non-fast-forward) git errors because of an out-of-date branch.

1
2
rm -rf _deploy
git clone git@github.com:username/username.github.io.git _deploy

Octopress should now be setup, and the source dir should contain your up-to-date markdown. To test things out make a change to a post (or make a new post) then regenerate and deploy.

1
2
rake generate
rake deploy

Your new changes should appear on your site. The only downside of this approach is that when you go back to your original machine and try to deploy, git will yell at you (i.e., non-fast-forward error) because the _deploy dir from that machine is now out of date. The best way to fix this is to remove it (see above) and re-clone it (see above). The other option is to edit the octopress/RakeFile and change the line: system "git push origin #{deploy_branch}" to system "git push origin +#{deploy_branch}" to force the deployment despite version mismatches (be sure to undo this change immediately after).

Building This Blog With Octopress

I just setup this blog using Octopress as the page generation engine and Github Pages to freely host the content. In total, it took me about four hours to get from start to first blog post, and that was mainly because my ruby environment on my dev machine was fairly messed up from mucking around six months ago.

This initial post is both a how-to for setting up a blog with Octopress/Github and quick cheat-sheet so I don’t forget how I did things.

Setup

The Octopress setup docs are incredibly helpful, so I’m not going to duplicate content explaining what everything means. Assuming ruby is correctly installed the setup commands are as follows:

1
2
3
4
5
6
7
## 1. Create personal github repo named "username.github.io", mine is "paulcichonski.github.io"
## 2. Clone Octopress repo locally and setup directory structure
git clone git://github.com/imathis/octopress.git octopress
cd octopress
rake install
## 3. connect your local install with your git repo (see Octopress docs for more details: http://octopress.org/docs/deploying/github/)
rake setup_github_pages

The above commands will leave your ‘/octopress’ dir in a state where you are ready to begin blogging. You can think of the ‘/octopress’ dir as a container for all your blog content as well as the library for all the commands you need to push content to Github.

Important Octopress Files and Directories

The following files and directories comprise the essential building blocks for an Octopress site:

  • /octopress/_config.yml: Holds all the configuration for the site, Octopress uses this when generating your static HTML pages from markdown.
  • /octopress/source/ – This is the directory that includes all the content you actually edit, it syncs with the ‘source’ branch of your github repository. Remember to always commit any changes made in this directory to your ‘source’ branch. Running “rake generate” from the ‘/octopress’ dir level will take the files in ‘source’, parse them in combination with your ‘_config.yml’ file and generate static html content into the ‘_deploy’ dir.
  • /octopress/_deploy – This is the directory that Octopress will dynamically generate, it contains your actual website static HTML that people see. When you run “rake deploy” Octopress handles pushing all content from ‘_deploy’ to the ‘master’ branch of your github repo (i.e., the branch responsible for serving content).

Important Octopress Commands

The following commands are the most useful for doing basic things with Octopress

1
2
3
4
5
6
## generates static html by parsing markdown in 'source' directory, using '\_config.yml' for configuration parameters
rake generate
## deploys all content in '\_deploy' to the 'master' branch on your github repo 
rake deploy 
## binds a webserver to localhost:4000 that serves pages from '\_deploy'; watches for changes in 'source' and auto-generates all changes into '\_deploy'. Use this for local development.
rake preview 

That is it for now, still need to document how to perform development from multiple machines and how to rebuild your local development workstation if something goes wrong.