Lambdacurry

Effective way to build production quality hadoop jobs (with mahout)


As I  started working on building usable data analytics with hadoop, I saw that there were several query languages out there - Pig and Hive being the primary ones. My primary goal is to define a reasonable framework for experimentation on Hadoop. My primary goal - deriving insights from data. Least important metric - cool new languages. (Which incidentally seems to fit the end conclusion).

Hive seemed to be the closest fit to the relational model and so serves as a good starting point. A very interesting post by Alan Gates @Yahoo made me realize that for most of the tasks out there, the big challenge is to take data and massage it to be usable for big data analysis (for example creating Vectors for Mahout’s recommenders).

Pig was the next point of experimentation - especially with its ability to work with Jruby UDFs.

register ./contrib/piggybank/java/piggybank.jar;
register /home/user/Code/hadoop/hbase-0.94.4/lib/zookeeper-3.4.5.jar
register /home/user/Code/hadoop/hbase-0.94.4/hbase-0.94.4.jar
register /home/user/Code/hadoop/hbase-0.94.4/lib/guava-11.0.2.jar
register /home/user/Code/hadoop/hbase-0.94.4/lib/protobuf-java-2.4.0a.jar
register ./udf.rb using jruby as uuid_udf;
/*
DEFINE A `access_log_link` CACHE('hdfs://localhost:54310/user/user/access_log#access_log_link');
set mapred.cache.localFiles 'hdfs://localhost:54310/user/user/access_log#access_log.1';
set mapred.create.symlink yes;
*/
/*DEFINE access_log_link CACHE('hdfs://localhost:54310/user/user/access_log#access_log_link');
*/
DEFINE LogLoader org.apache.pig.piggybank.storage.apachelog.CommonLogLoader();
DEFINE access_log_link2 CACHE('hdfs://localhost:54310/user/user/access_log#access_log_link');
/*log = LOAD 'hdfs://localhost:54310/user/user/access_log' USING LogLoader as (remoteAddr, remoteLogname, user, time, method,uri, proto, status, bytes, referer, userAgent);
*/
log = LOAD 'hdfs://localhost:54310/user/user/access_log' USING LogLoader as (remoteAddr, remoteLogname, user, time, method,uri, proto, status, bytes);
log_id = FOREACH log GENERATE uuid_udf.uuid(),remoteAddr, remoteLogname, user, time, method,uri, proto, status, bytes;
/*log_id = FOREACH log GENERATE CONCAT(time,CONCAT(remoteAddr, referer)),remoteAddr, remoteLogname, user, time, method,uri, proto, status, bytes;
*/
/*dump log_id;*/
store log_id into 'hbase://access' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('log:remoteAddr log:remoteLogname log:user log:time log:method log:uri log:proto log:status log:bytes');
view raw 1.pig hosted with ❤ by GitHub
require 'pigudf'
require 'java' # Magic line for JRuby - Java interworking
import java.util.UUID
class JRubyUdf < PigUdf
outputSchema "uuid:chararray"
def uuid()
java.util.UUID.randomUUID().toString()
end
outputSchema "type:chararray"
def type(auditRecord)
(recordPrefix, recordData) = auditRecord.split(':')
# Generate a hash of all the prefix fields: node, type and msg
prefixFields = Hash[*recordPrefix.split(' ').map {|t| t.split('=')}.flatten]
prefixFields['type'] # return the type
end
end
view raw udf.rb hosted with ❤ by GitHub

This runs fine… on your laptop. However, the pig problem happens when you are actually preparing to run this job on a cluster. At that time all the hidden things start to hit you - JARs not being in classpath, JARs not being in the distributed cache and things start to add up to be a pain. These start impacting you more and more if you are combining more than one toolkit - for example Hadoop + HBase + Mahout.

That said, one of the very interesting points of integration that I stumbled upon was embedding Pig inside Python (using Java) or Javascript. (BTW, a lot of this comes via Hortonworks, which is doing interesting things around Hadoop and Pig). That seemed to me as very interesting, however if you notice there is a very quirky piece of code in there :

P = Pig.compile("""register udf.jar DEFINE find_centroid FindCentroid(‘$centroids’);

Essentially, you could write a UDF in Python, then call Pig through Python. The Pig script then calls the UDF. All of this finally gets compiled on-the-fly into a Jar (cos, that’s what Pig does) and sent to Hadoop. This was essentially the step that convinced me that this tooling was unnecessary complex and there ought to be a simpler way.

I realized that Pig and Hive are actually step two in the whole implementation process of mapreduce and Hadoop.

Step one has got to be writing your own custom jobs in Java or Scala and creating standalone job JARs.

Yes, this is suboptimal since you end up with large sized jars (a simple job JAR with hadoop and mahout ended up being 32MB). Remember that these JARs will be populated to your cluster (and hopefully persisted in the distributed cache for subsequent runs), but when beginning you dont really need to worry about optimization. Secondly, at a later stage you could still this very workflow to create a non-standalone JAR and instead push the dependent libs out by using either

  • the -Dmapred.cache.files variable to force distributed caching of libraries
  •  use the handy -libjars variable.
  • or use HADOOP_CLASSPATH environment variable.

My Scala code here is a simple Mahout vector creation algorithm (based on sample data from here ) and creates a standalone JAR (using sbt assembly)


Lambdacurry

Effective way to build production quality hadoop jobs (with mahout)

Published

June 17, 2013

Find me on Twitter @sandeepssrin

Did i make any mistake? Please consider sending a pull request.