Friday, April 3, 2015

Bill Gates 2030 Vision

I recently watched this video, Verge interviewed Bill Gates about his vision for 2030,  this is the man who predicted every home will have a PC, which turned out to be true, what is Mr.Gates vision fifteen years from would be..

Four key areas for improvement health care, farming, banking and education.

Key take aways:
  • This man is serious about his goals/visions, each sector has very specific goals, its very hard to come up with goals/vision
  • Health
    • Upstream: inventing new vaccines specifically for kids less than five years
    • Downstream: How do you get them out to kids around the worls
    • Goal: Currently one out of twenty kids dies before age of 5, this should increase to one in forty 
  • Farming
    • Better seeds with resistance to heat & low water, which hints GMO stuff but at least educating farmers about the benefits
    • Improved credit & loan systems for farmers
    • Increase world food productivity 
  • Education 
    • Emphasis on online learning 
    • Improve critical programming skills
    • Basics of reading & writing 
    • Tablet computing should have a sandbox to test new code  
  • Banking
    • In my view banking vision is radical and I see it coming soon, for small transactions banks are loosing money
    • Banking digital infrastructure should be revised to create a utility type service that lets you move money to pay someone else or switch from one bank to another. This calls in for a new regulatory infrastructure where money transfer system is licensed like phone numbers with banks (switching bank accounts should be like switching phone services)  

Monday, December 8, 2014

Hadoop Definitive Guide Byte Sized Notes - 6

Chapter 7 MapReduce Types and Formats

General form of map & reduce 
  • map: (k1,v1) -> list(k2,v2)
  • reduce: (k2,list(v2)) ->list(k3,v3)
  • Keys emitted during the map phase must implement WritableComparable so that the keys can be sorted during the shuffle and sort phase.
  • Reduce input must have the same types as the map output
  • The number of map tasks are not set, the number of map tasks are equal to the number of splits that the input is turned 
Input formats 
  • Data is passed to the mapper by InputFormat, Input Format is a factory of RecordReader object to extract the (key, value) pairs from input source
  • Each mapper deals with single input split no matter the file size
  • Input Split is the chunk of the input processed by a single map, represented by InputSplit
  • InputSplits created by InputFormat, FileInputFormat default for file types 
  • Can process Text, XML, Binary 
  • Ensure key, value pair is processed
  • Ensure (k, v) not processed more than once
  • Handle (k,v) which get split
Output formats: Text output, binary output 

Chapter 8 MapReduce Features

  • Useful for QC, problem diagnostics
  • Two counter groups task & job
    • Task Counter
      • gather information about task level statistics over execution time
      • counts info like, input records, skipped records, output bytes etc
      • they are sent across the network
    • Job Counter
      • measure job level statistics
    • User defined counters 
      • Counters are defined by a Java enum
      • Counters are global
  • SortOrder controlled by RawComparator
  • Partial sort, secondary sort (Lab)

To run map-side join use CompositeInputFormat from the org.apache.hadoop.mapreduce.joinpackage

Side Data: Read only data required to process main data set, ex skip words for wordcount

Distributed Cache: To save network bandwidth, side data files are normally copied to any particular node once per job.

Map only Jobs: Image Processing, ETL, Select * from table, distcp, I/p data sampling 


Map/reduce job will contains most of the time more than  1 reducer.  So basically, when a mapper emits a key value pair, it has to be sent to one of the reducers. Which one? The mechanism sending specific key-value pairs to specific reducers is called partitioning (the key-value pairs space is partitioned among the reducers). A Partitioner is responsible to perform the partitioning.

For more info this article has detailed explanation

Zero reducers only map tasks
  • One reducer when the amount of data is small enough to be processed comfortably by one reducer
  • Used for reducing intermediate data
  • Runs locally on single mapper’s  o/p
  • Best suited for commutative and associative operations
  • Code identical to Reducer 
  • conf.setCombinerClass(Reducer.class);
  • May or may not run on the output from some or all mappers

Hadoop Definitive Guide Byte Sized Notes - 5

Chapter 6 Anatomy of a MapReduce Job Run

Frameworks used for execution is set by property

  • local -> local job runner
  • classic -> MR1
  • yarn -> MR2

MR1 - Classic

  • Client submits MR job
  • JT coordinates the job run
  • JT resides on master node
  • TT is actually responsible for instantiating map/reduce tasks

Map Task

  • May completely choose to ignore the input key
  • Map outputs zero or more K/V pairs
  • Typical mappers 
    • Convert to Upper Case
    • Explode Mapper 
    • Filter Mapper
    • Changing Keyspace

Reduce Task

  • All values associated with a particular intermediate key are guaranteed to go the same reducer
  • Intermediate keys and values are passed to reducer in sorted key order  
  • Typical Reducers
    • Sum Reducer
    • Identity Reducer 

Progress in MR1 means

  • Reading an input record (in a mapper or reducer)
  • Writing an output record (in a mapper or reducer)
  • Setting the status description on a reporter (using Reporter’s setStatus()method)
  • Incrementing a counter (using Reporter’s incrCounter()method)
  • Calling Reporter’s progress()method
Handling Job/Failures in MR1

Mainly three failure modes

  • Failure to run task
    • If task (map/reduce) produces run time error then TT sends signal to JT to notify failed task attempt, JT will retry for 4 times
    • The # of attempts is set by
    • JT will abort task and report failure 
    • A task attempt may also be killed, which is different from failing, during speculative execution task attempt is killed
    • Killed task attempts do not count against the number of attempts to run the task
  • Failure of task tracker
    • if TT is running slow or crashes, TT wont send heartbeat signal to JT
    • If JT did not receive heartbeat signal for 10 min then JT will respawn a diff TT to complete the failed TT map/reduce task
    • A TT is blacklisted if the number of faults is over some minimum threshold (the default is four)
  • Failure of JT
    • Restarting a jobtracker, any jobs that were running at the time it was stopped will need to be resubmitted.
Limitations of MR1

  • JT single point of failure
  • JT bottleneck; Hive, PIG, MR all get converted to MR jobs and are submitted to JT
  • JT does job scheduling & task progress, restarting failed nodes


  • YARN splits JT into two daemons Resource Manager RM & Application Master AM, Node Managers, yarn-site.xml
  • RM negotiates AM for cluster resources and allocates resources. Node Managers run on data nodes to ensure the application is using only the allocated resources
  • MR has dedicated AM which runs for duration of app

Failure in YARN

Mainly four failure modes

  • Failure to run task
    • hanging/failed tasks reported to AM 
  • Failure in Application Master
    • Apps in YARN are tried multiple times in the event of failure
    • AM sends periodic heartbeats to the RM, if RM detects failure, a new AM is started
    • The client polls the application master for progress reports
  • Failure in Node Manager
    • It will stop sending heartbeats to the resource manager
    • Node manager will be removed from the resource manager’s pool (default 10 min)
    • Node managers may be blacklisted if the number of failures for the application is high. Blacklisting is done by the application master. resource manager does not do blacklisting
  • Failure in Resource Manager
    • SPOF, may be recovered from saved state. The state consists of the node managers in the system as well as the running applications

Job Scheduling

  • Scheduling users jobs, FIFO queue based scheduler
  • Multi user -Fair & Capacity

Fair Scheduler

  • Every user has a fair share of the cluster
  • Supports preemption
  • Scheduler will kill tasks in pools running over capacity in order to give more slots to the pool running under capacity

Capacity Scheduler 
Simulate a separate MapReduce cluster with FIFO scheduling for each user or organization. Within each queue, jobs are scheduled using FIFO scheduling

Shuffle and Sort

Shuffle (Map): The process by which the system performs the sort—and transfers the map outputs to the reducers as inputs in a sorted order is known as shuffle.

Map Side 

  • Map outputs are written to a circular memory buffer, a background thread sorts the keys in memory, when the buffer reaches 80% then a thread will spill the contents to disk.
  • Before the spill combiner can be run
  • Its better to compress the output of the map file
  • The output partitions are transferred over the HTTP to reducer

Reduce Side 

  • How do reducers know which machines to fetch map output from?
    • After map output is completed the TT notifies JT (AM)
    • JT(AM) keeps a log of host:map_output
    • Thread in reducer asks JT(AM) for host:map_output
    • Once we have host:map_output the copy phase begins by the reducer thread node, map outputs are copied over HTTP to reducer
    • Map outputs are deleted only when they are told to by JT(AM)

Sort (Reduce)

  • Merges the map outputs, maintaining their sort ordering
  • Ex: 50 map outputs and the merge factor is 10, then we have 5 intermediate files
  • Finally reduce function is invoked for each key in the sorted output.

Configuration Tuning

  • Avoiding multiple spills to disk during shuffle
  • On Reduce try to write intermediate data entirely in memory, See Table 6-1

Speculative Execution: Detect when a task is running slower than expected and launches another equivalent task as a backup. This is termed speculative execution of tasks.


  • I/P received through stdin
  • O/P written to stdout
  • All key values in reducer are iterators so user must write logic to keep track of key change

Hadoop Definitive Guide Byte Sized Notes - 4

Chapter 5 Map Reduce Process Flow

General dev process
  • Write & test map & reduce functions in dev/local
  • Deploy job to cluster
  • Tune job later
The Configuration API
  • Collection of configuration properties and their values are instance of Configuration class (org.apache.hadoop.conf package)
  • Properties are stored in xml files
  • Default properties are stored in core-dafult.xml, there are multiple conf files for multiple settings
  • If mapred.job.tracker is set to local then execution is local job runner mode
Setting up Dev
  • Pseudo-distributed cluster is one whose daemons all run on the local machine
  • Use MRUnit for testing Map/Reduce
  • Write separate classes for cleaning the data 
  • Tool, toolrunner helpful for debugging, parsing CLI
Running on Cluster
  • Job's classes must be packaged to a jar file
  • setJarByClass() tells hadoop to find the jar file for your program
  • Use Ant or Maven to create jar file
  • On a cluster map and reduce tasks run in separate JVM
  • waitForCompletion() launches the job and polls for progress
  • Ex: 275GB of input data read from 34GB of compressed files broken into 101Gzip files
  • Typical Job id format: job_YYYYMMDDTTmm_XXXX
  • Tasks belong to Jobs, typical TaskID format task_Jobid_m_XXXXXX; m->;map, r->;reduce
  • Use MR WebUI to track Jobtracker page/ tasks page/nematode
Debugging Job
  • Use WebUI to analyse tasks & tasks detail page 
  • Use custom counters
  • Hadoop Logs: Common logs for system daemons, HDFS audit, MapRed Job hist, MapRed task hist
Tuning Jobs

Can I make it run faster? Analysis done after job is run
  • If mappers are taking less time, then reduce mappers, let each one run longer
  • # of reducers should be less than reduce slots 
  • Combiners associate properties, reduce map output
  • Map output compression
  • Serialization other than Writeable
  • Shuffle tweaks
  • Profiling Tasks: HPROF profiling tool comes with JDK
More than one MapReduce & workflow management

  • ChainMapper/ChainReducer to run multiple maps
  • Linear chain job flow 
    • JobClient.runJob(conf1);
    • JobClient.runJob(conf2);
  • Apache Oozie used for running dependent jobs, composed of workflow engine, coordinator engine (Lab)
    • unlike JobControl Oozie runs as a service 
    • workflow is a DAG of action node and control flow nodes
    • Action node moving files in HDFS, MR, Pig, Hive
    • Control Flow coordinates actions nodes
    • Workflows written in XML and has three control-flow nodes and one action node: a start control node, a map-reduce action node, a kill control node, and an end control node. 
    • All workflows must have one start and one end node.
    • Packaging, deploying and running oozie workflow job
    • Triggered by predicates usual time interval or date or external event 

Saturday, October 25, 2014

Hadoop Definitive Guide Byte Sized Notes - 3

Chapter 4 Hadoop IO
Data Integrity in Hadoop
  • For data integrity usually CRC-32 is used but with low end hardware it could be checksum which is corrupt not the data
  • Datanodes are responsible for verifying the data they receive before storing the data and its checksum
  • Applies to data that they receive from clients and from other datanodes during replication
  • Each datanode keeps a persistent log of checksum verifications
  • Hadoop LocalFileSystem performs client-side check summing
  • Creates hidden .filename.crc
  • To disable checksum use RawLocalFileSystem
  • FileSystem fs = new RawLocalFileSystem();
  • More space to store files, faster data transfer across network
  • Splittable shows whether the compression format supports splitting, that is, whether you can seek to any point in the stream and start reading from some point further on
  • Compression algo have space/time trade off
    • Gzip sits in middle
    • BZip compression better than Gzip but slow
    • LZO, LZ4,Snappy are faster
    • BZip is only splittable
  • A codec is the implementation of a compression-decompression algorithm
  • We can compress file before writing, for reading a compressed file, CompressionCodecFactory is used to find file name extension and appropriate algo is used
Compression and Input Splits
  • Use container file format such as Sequence, RCFile or Avro
  • Use compression algo which support splitting
Using Compression in MapReduce
  • To compress the output of a MapReduce job, in the job configuration, set the mapred.output.compress property to true
  • We can compress output of a map
  • Serialization process of converting objects to byte stream for transmitting over network, De-serialization is the opposite
  • In Hadoop, inter-process communication between nodes is implemented, using remote procedure calls(RPCs), RPC serialization should be compact, fast, extensible, and interoperable
  • Hadoop uses its own serialization format, Writables, Avro is used to overcome limitation of Writables interface
  • The Writable Interface 
            Interface for serialization
            IntWritable wrapper for Java int, use set() to assign value
            IntWritable writable = new IntWritable();
            IntWritable writable = new IntWritable(163);
  • For writing we use DataOutput binary stream and for reading we use DataInput binary stream
  • DataOutput convert 163 into byte array, to de serialize the byte array is converted back to IntWritable
  • WritableComparable and comparators
    • Comparison of types is crucial for MapReduce
    • IntWritable implements the WritableComparable interface
    • This interface permits users to compare records read from a stream without de serializing them into objects (no overhead of object creation)
    • keys must be WritableComparable because they are passed to reducer in sorted order
    • There are wrappers for most data types in Java, boolean, byte, short, int, float, long, double, text is UTF-8 sequence
    • BytesWritable is a wrapper for an array of binary data
    • NullWritableis a special type of Writable, as it has a zero-length serialization. No bytes are written to or read from the stream
    • ObjectWritable is a general-purpose wrapper for the following: Java primitives, String, enum, Writable, null, or arrays of any of these type
  • Writable collections
    • There are six Writablecollection types, ex for 2D array use TwoDArrayWritable
    • We can create custom writable class and it can implements WritableComparable (Lab)
Serialization Frameworks
  • Hadoop has an API for pluggable serialization frameworks
  • Language-neutral data serialization system
Why Avro?
  • Written in JSON, encoded in binary
  • Language-independent schema has rich schema resolution capabilities
  • .avsc is the conventional extension for an Avro schema, .avro is data
  • Avro’s language interoperability
File-Based Data Structures
  • Special data structure to hold data
  • Binary encoded key, value pairs
  • To save log files text is not suitable
  • Useful for storing binary key value pairs, .seq ext
  • SequenceFileInputFormat Binary file (k,v)
  • SequenceFileAsTextInputFormat (key.toString(), value.toString())
  • Sorted SequenceFilewith an index to permit lookups by key
  • MapFile.Writer to write, .map ext
  • contains keys
  • Variation of mapfile
    • SetFile for storing Writable keys
    • ArrayFile int index
    • BloomMapFile fast get()
  • Fix() method is usually used for recreating corrupted indexes 
  • Sequence files can be converted to mapfiles 

Monday, October 20, 2014

Hadoop Definitive Guide Byte Sized Notes - 2

Chapter 3 HDFS
  • Hadoop can be integrated with local file system, S3 etc
  • Writes always happen at the end of the file, cannot append  data in between
  • Default block size 64 MB
Name Node
  • Name node holds the file system metadata, namespace and tree  in memory, without this metadata there is no way to access files in cluster 
  • Stores this info on local disk in 2 files name space image & edit log
  • NN keeps track of which blocks makes up a file
  • Block locations are dynamically updates
  • NN must be running at all times
  • When clients reads a file NN will not be a bottleneck
Secondary Name Node 
  • Stores the partial of name node’s namespace image and edit log
  • In case of name node failure we can merge namespace image and edit log 
HDFS Federation
  • Clusters can scale by adding name nodes, each NN manages a portion of file system namespace 
  • Namespace volumes are independent, Name Nodes don’t communicate with each other and doesn’t affect the availability of other name nodes
High Availability 
  • Pair of NN in active-stand by config
  • NN’s share use shared storage to share edit log
  • DN send block reports for both the NN
Fail Over
  • NN runs a process to monitor failures, if failover detected then it gracefully terminates
  • Previously active name node is prevented from doing damage following will help prevent corruption of active name node
    • Disable the network port
    • Kill all process
    • Revoke access to shared storage
File System
  • Cannot execute a file in HDFS
  • Replication not applicable to directories 
  • Globbing wild cards to match multiple files in a directory, sample file selection: input/ncdc/all/190{1,2}.gz

Reading Data 
  • FileSystemAPI contains static factory methods to open i/p stream
  • A file in HDFS is path object
  • Default file system properties are stored in conf/core-site.xml  
    • FSDataInputStream
      • FileSystem uses FSDataInputStream object to open a file
      • positioned readable, seek() expensive
  • If FSDataInputStream fails then sends error to NN  and  NN sends the next closet block address
Writing Data
  • FSDataOutputStream main class
  • If FSDataOutputStream fails then pipeline is closed, partial blocks are deleted, bad data node removed from pipeline, NN replicates under replicated blocks
Replica Placement
  • 1st replica at random node, 2nd replica off rack to random node, 3rd replica same rack as 2nd but diff node, applicable to reducers output as well
  • Same data across all replicated data nodes, sync()
HAR Hadoop Archives
  • Typically used for small data sets
  • Packs all small blocks efficiently 
  • Archive tool
  • Limitations
    • No archive compression
    • Immutable to add or remove recreate HAR

Friday, October 17, 2014

Hadoop Definitive Guide Byte Sized Notes - 1

Chapter 1

A common way of avoiding data loss is through replication, storage - HDFS, analysis- MapReduce

Disk Drive

  • Seek Time: the time taken to move the disk’s head to a particular location on the disk to read or write data, typical RDMBS use B-Tree which has good seek time
  • Transfer Rate: streaming of data, disk bandwidth for Big Data apps we need high transfer rate, coz, its write once read many times

New in Hadoop 2.X
  • HDFS Federation
  • YARN MR2
  • Secure Authentication
Chapter 2 MapReduce

i/p -> map -> shuffle -> sort -> reduce -> o/p

Some diff between MR1/MR2
  1. mapred -> mapreduce
  2. Context object unifies JobConf, OutputCollector, Reporter
  3. JobClient deprecated
  4. map o/p: part-m-nnnn, reduce o/p:  part-r-nnnnn
  5. iterator to iterable  
Data Locality
  • To run a map task on the data node where the data resides
  • map task writes output to local disk not HDFS
  • No data locality for reduce, o/p of all mappers gets transferred to reduce node
  • # of reduce tasks are not governed by size of i/p
  • 0 reduce tasks are possible
  • performed on the output of the map task
  • associate property max(0,20,10,15,25) = max( max(0,20,10), max(25,15))
    • boosts the performance, pre filtering data before sent across the wire and reduce has smaller data set to work with