Elegant Data

How to read HBase table from Scala Spark


Step 1: Create a dummy table called customers in HBase, please refer this link on how to populate this table https://mapr.com/products/mapr-sandbox-hadoop/tutorials/tutorial-getting-started-with-hbase-shell/


hbase(main):004:0> scan '/user/user01/customer'
ROW                                  COLUMN+CELL                                                                                              
 amiller                             column=addr:state, timestamp=1497809527266, value=TX                                                     
 jsmith                              column=addr:city, timestamp=1497809526053, value=denver                                                  
 jsmith                              column=addr:state, timestamp=1497809526080, value=CO                                                     
 jsmith                              column=order:date, timestamp=1497809490021, value=10-18-2014                                             
 jsmith                              column=order:numb, timestamp=1497809526118, value=6666                                                   
 njones                              column=addr:city, timestamp=1497809525860, value=miami                                                   
 njones                              column=addr:state, timestamp=1497809526151, value=TX                                                     
 njones                              column=order:numb, timestamp=1497809525941, value=5555                                                   
 tsimmons                            column=addr:city, timestamp=1497809525998, value=dallas                                                  
 tsimmons                            column=addr:state, timestamp=1497809526023, value=TX                                                     
4 row(s) in 0.0310 seconds

Step 2:  Next is reading this table in Spark, I used spark-shell to read the table and keyValueRDD is what we are looking for 

[mapr@maprdemo ~]$ /opt/mapr/spark/spark-2.1.0/bin/spark-shell 
Spark context Web UI available at http://10.0.2.15:4040
Spark context available as 'sc' (master = local[*], app id = local-1497831718510).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0-mapr-1703
      /_/
         
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.

scala> 

scala> import org.apache.spark._
import org.apache.spark._

scala> import org.apache.spark.rdd.NewHadoopRDD
import org.apache.spark.rdd.NewHadoopRDD

scala> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}

scala> import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.client.HBaseAdmin

scala> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

scala> import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Path

scala> import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.HColumnDescriptor

scala> import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.util.Bytes

scala> import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Put

scala> import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTable

scala> 

scala> 

scala> val tableName="/user/user01/customer"
tableName: String = /user/user01/customer

scala> 

scala> 

scala> val hconf = HBaseConfiguration.create()
hconf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, org.apache.hadoop.conf.CoreDefaultProperties, core-site.xml, mapred-default.xml, org.apache.hadoop.mapreduce.conf.MapReduceDefaultProperties, mapred-site.xml, yarn-default.xml, org.apache.hadoop.yarn.conf.YarnDefaultProperties, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, hbase-default.xml, hbase-site.xml

scala> 

scala> hconf.set(TableInputFormat.INPUT_TABLE, tableName)

scala> 

scala> 

scala> 

scala> val admin = new HBaseAdmin(hconf)
warning: there was one deprecation warning; re-run with -deprecation for details
admin: org.apache.hadoop.hbase.client.HBaseAdmin = org.apache.hadoop.hbase.client.HBaseAdmin@2093bb6c

scala> 

scala> val hBaseRDD = sc.newAPIHadoopRDD(hconf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
hBaseRDD: org.apache.spark.rdd.RDD[(org.apache.hadoop.hbase.io.ImmutableBytesWritable, org.apache.hadoop.hbase.client.Result)] = NewHadoopRDD[0] at newAPIHadoopRDD at :38

scala> val result = hBaseRDD.count()
result: Long = 4

scala> 

scala> val resultRDD = hBaseRDD.map(tuple => tuple._2) 
resultRDD: org.apache.spark.rdd.RDD[org.apache.hadoop.hbase.client.Result] = MapPartitionsRDD[1] at map at :40

scala> resultRDD
res1: org.apache.spark.rdd.RDD[org.apache.hadoop.hbase.client.Result] = MapPartitionsRDD[1] at map at :40

scala> val keyValueRDD = resultRDD.map(result =>
     | (Bytes.toString(result.getRow()).split(" ")(0),
     | Bytes.toString(result.value)))
keyValueRDD: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[2] at map at :44

scala> keyValueRDD.collect()
res2: Array[(String, String)] = Array((amiller,TX), (jsmith,denver), (njones,miami), (tsimmons,dallas))

Bill Gates 2030 Vision

I recently watched this video https://youtu.be/8RETFyDKcw0, Verge interviewed Bill Gates about his vision for 2030,  this is the man who predicted every home will have a PC, which turned out to be true, what is Mr.Gates vision fifteen years from would be..

Four key areas for improvement health care, farming, banking and education.


Key take aways:
  • This man is serious about his goals/visions, each sector has very specific goals, its very hard to come up with goals/vision
  • Health
    • Upstream: inventing new vaccines specifically for kids less than five years
    • Downstream: How do you get them out to kids around the worls
    • Goal: Currently one out of twenty kids dies before age of 5, this should increase to one in forty 
  • Farming
    • Better seeds with resistance to heat & low water, which hints GMO stuff but at least educating farmers about the benefits
    • Improved credit & loan systems for farmers
    • Increase world food productivity 
  • Education 
    • Emphasis on online learning 
    • Improve critical programming skills
    • Basics of reading & writing 
    • Tablet computing should have a sandbox to test new code  
  • Banking
    • In my view banking vision is radical and I see it coming soon, for small transactions banks are loosing money
    • Banking digital infrastructure should be revised to create a utility type service that lets you move money to pay someone else or switch from one bank to another. This calls in for a new regulatory infrastructure where money transfer system is licensed like phone numbers with banks (switching bank accounts should be like switching phone services)  

Hadoop Definitive Guide Byte Sized Notes - 6

Chapter 7 MapReduce Types and Formats

General form of map & reduce 
  • map: (k1,v1) -> list(k2,v2)
  • reduce: (k2,list(v2)) ->list(k3,v3)
  • Keys emitted during the map phase must implement WritableComparable so that the keys can be sorted during the shuffle and sort phase.
  • Reduce input must have the same types as the map output
  • The number of map tasks are not set, the number of map tasks are equal to the number of splits that the input is turned 
Input formats 
  • Data is passed to the mapper by InputFormat, Input Format is a factory of RecordReader object to extract the (key, value) pairs from input source
  • Each mapper deals with single input split no matter the file size
  • Input Split is the chunk of the input processed by a single map, represented by InputSplit
  • InputSplits created by InputFormat, FileInputFormat default for file types 
  • Can process Text, XML, Binary 
RecordReader 
  • Ensure key, value pair is processed
  • Ensure (k, v) not processed more than once
  • Handle (k,v) which get split
Output formats: Text output, binary output 


Chapter 8 MapReduce Features

Counters
  • Useful for QC, problem diagnostics
  • Two counter groups task & job
    • Task Counter
      • gather information about task level statistics over execution time
      • counts info like, input records, skipped records, output bytes etc
      • they are sent across the network
    • Job Counter
      • measure job level statistics
    • User defined counters 
      • Counters are defined by a Java enum
      • Counters are global
Sorting
  • SortOrder controlled by RawComparator
  • Partial sort, secondary sort (Lab)
Joins

To run map-side join use CompositeInputFormat from the org.apache.hadoop.mapreduce.joinpackage

Side Data: Read only data required to process main data set, ex skip words for wordcount

Distributed Cache: To save network bandwidth, side data files are normally copied to any particular node once per job.

Map only Jobs: Image Processing, ETL, Select * from table, distcp, I/p data sampling 

Partitioning 

Map/reduce job will contains most of the time more than  1 reducer.  So basically, when a mapper emits a key value pair, it has to be sent to one of the reducers. Which one? The mechanism sending specific key-value pairs to specific reducers is called partitioning (the key-value pairs space is partitioned among the reducers). A Partitioner is responsible to perform the partitioning.

For more info this article has detailed explanation 
http://www.philippeadjiman.com/blog/2009/12/20/hadoop-tutorial-series-issue-2-getting-started-with-customized-partitioning/

Zero reducers only map tasks
  • One reducer when the amount of data is small enough to be processed comfortably by one reducer
Combiners
  • Used for reducing intermediate data
  • Runs locally on single mapper’s  o/p
  • Best suited for commutative and associative operations
  • Code identical to Reducer 
  • conf.setCombinerClass(Reducer.class);
  • May or may not run on the output from some or all mappers

Hadoop Definitive Guide Byte Sized Notes - 5

Chapter 6 Anatomy of a MapReduce Job Run

Frameworks used for execution is set by mapreduce.framework.name property

  • local -> local job runner
  • classic -> MR1
  • yarn -> MR2

MR1 - Classic

  • Client submits MR job
  • JT coordinates the job run
  • JT resides on master node
  • TT is actually responsible for instantiating map/reduce tasks

Map Task

  • May completely choose to ignore the input key
  • Map outputs zero or more K/V pairs
  • Typical mappers 
    • Convert to Upper Case
    • Explode Mapper 
    • Filter Mapper
    • Changing Keyspace

Reduce Task

  • All values associated with a particular intermediate key are guaranteed to go the same reducer
  • Intermediate keys and values are passed to reducer in sorted key order  
  • Typical Reducers
    • Sum Reducer
    • Identity Reducer 

Progress in MR1 means

  • Reading an input record (in a mapper or reducer)
  • Writing an output record (in a mapper or reducer)
  • Setting the status description on a reporter (using Reporter’s setStatus()method)
  • Incrementing a counter (using Reporter’s incrCounter()method)
  • Calling Reporter’s progress()method
Handling Job/Failures in MR1

Mainly three failure modes

  • Failure to run task
    • If task (map/reduce) produces run time error then TT sends signal to JT to notify failed task attempt, JT will retry for 4 times
    • The # of attempts is set by mapred.map.max.attempts/mapred.map.reduce.attempts
    • JT will abort task and report failure 
    • A task attempt may also be killed, which is different from failing, during speculative execution task attempt is killed
    • Killed task attempts do not count against the number of attempts to run the task
  • Failure of task tracker
    • if TT is running slow or crashes, TT wont send heartbeat signal to JT
    • If JT did not receive heartbeat signal for 10 min then JT will respawn a diff TT to complete the failed TT map/reduce task
    • A TT is blacklisted if the number of faults is over some minimum threshold (the default is four)
  • Failure of JT
    • Restarting a jobtracker, any jobs that were running at the time it was stopped will need to be resubmitted.
Limitations of MR1

  • JT single point of failure
  • JT bottleneck; Hive, PIG, MR all get converted to MR jobs and are submitted to JT
  • JT does job scheduling & task progress, restarting failed nodes

MR2 - YARN

  • YARN splits JT into two daemons Resource Manager RM & Application Master AM, Node Managers, yarn-site.xml
  • RM negotiates AM for cluster resources and allocates resources. Node Managers run on data nodes to ensure the application is using only the allocated resources
  • MR has dedicated AM which runs for duration of app

Failure in YARN

Mainly four failure modes

  • Failure to run task
    • hanging/failed tasks reported to AM 
  • Failure in Application Master
    • Apps in YARN are tried multiple times in the event of failure
    • AM sends periodic heartbeats to the RM, if RM detects failure, a new AM is started
    • The client polls the application master for progress reports
  • Failure in Node Manager
    • It will stop sending heartbeats to the resource manager
    • Node manager will be removed from the resource manager’s pool (default 10 min)
    • Node managers may be blacklisted if the number of failures for the application is high. Blacklisting is done by the application master. resource manager does not do blacklisting
  • Failure in Resource Manager
    • SPOF, may be recovered from saved state. The state consists of the node managers in the system as well as the running applications

Job Scheduling

  • Scheduling users jobs, FIFO queue based scheduler
  • Multi user -Fair & Capacity

Fair Scheduler

  • Every user has a fair share of the cluster
  • Supports preemption
  • Scheduler will kill tasks in pools running over capacity in order to give more slots to the pool running under capacity

Capacity Scheduler 
Simulate a separate MapReduce cluster with FIFO scheduling for each user or organization. Within each queue, jobs are scheduled using FIFO scheduling

Shuffle and Sort

Shuffle (Map): The process by which the system performs the sort—and transfers the map outputs to the reducers as inputs in a sorted order is known as shuffle.

Map Side 

  • Map outputs are written to a circular memory buffer, a background thread sorts the keys in memory, when the buffer reaches 80% then a thread will spill the contents to disk.
  • Before the spill combiner can be run
  • Its better to compress the output of the map file
  • The output partitions are transferred over the HTTP to reducer

Reduce Side 

  • How do reducers know which machines to fetch map output from?
    • After map output is completed the TT notifies JT (AM)
    • JT(AM) keeps a log of host:map_output
    • Thread in reducer asks JT(AM) for host:map_output
    • Once we have host:map_output the copy phase begins by the reducer thread node, map outputs are copied over HTTP to reducer
    • Map outputs are deleted only when they are told to by JT(AM)

Sort (Reduce)

  • Merges the map outputs, maintaining their sort ordering
  • Ex: 50 map outputs and the merge factor is 10, then we have 5 intermediate files
  • Finally reduce function is invoked for each key in the sorted output.

Configuration Tuning

  • Avoiding multiple spills to disk during shuffle
  • On Reduce try to write intermediate data entirely in memory, See Table 6-1

Speculative Execution: Detect when a task is running slower than expected and launches another equivalent task as a backup. This is termed speculative execution of tasks.

Streaming:

  • I/P received through stdin
  • O/P written to stdout
  • All key values in reducer are iterators so user must write logic to keep track of key change



Hadoop Definitive Guide Byte Sized Notes - 4

Chapter 5 Map Reduce Process Flow

General dev process
  • Write & test map & reduce functions in dev/local
  • Deploy job to cluster
  • Tune job later
The Configuration API
  • Collection of configuration properties and their values are instance of Configuration class (org.apache.hadoop.conf package)
  • Properties are stored in xml files
  • Default properties are stored in core-dafult.xml, there are multiple conf files for multiple settings
  • If mapred.job.tracker is set to local then execution is local job runner mode
Setting up Dev
  • Pseudo-distributed cluster is one whose daemons all run on the local machine
  • Use MRUnit for testing Map/Reduce
  • Write separate classes for cleaning the data 
  • Tool, toolrunner helpful for debugging, parsing CLI
Running on Cluster
  • Job's classes must be packaged to a jar file
  • setJarByClass() tells hadoop to find the jar file for your program
  • Use Ant or Maven to create jar file
  • On a cluster map and reduce tasks run in separate JVM
  • waitForCompletion() launches the job and polls for progress
  • Ex: 275GB of input data read from 34GB of compressed files broken into 101Gzip files
  • Typical Job id format: job_YYYYMMDDTTmm_XXXX
  • Tasks belong to Jobs, typical TaskID format task_Jobid_m_XXXXXX; m->;map, r->;reduce
  • Use MR WebUI to track Jobtracker page/ tasks page/nematode
Debugging Job
  • Use WebUI to analyse tasks & tasks detail page 
  • Use custom counters
  • Hadoop Logs: Common logs for system daemons, HDFS audit, MapRed Job hist, MapRed task hist
Tuning Jobs

Can I make it run faster? Analysis done after job is run
  • If mappers are taking less time, then reduce mappers, let each one run longer
  • # of reducers should be less than reduce slots 
  • Combiners associate properties, reduce map output
  • Map output compression
  • Serialization other than Writeable
  • Shuffle tweaks
  • Profiling Tasks: HPROF profiling tool comes with JDK
More than one MapReduce & workflow management

  • ChainMapper/ChainReducer to run multiple maps
  • Linear chain job flow 
    • JobClient.runJob(conf1);
    • JobClient.runJob(conf2);
  • Apache Oozie used for running dependent jobs, composed of workflow engine, coordinator engine (Lab)
    • unlike JobControl Oozie runs as a service 
    • workflow is a DAG of action node and control flow nodes
    • Action node moving files in HDFS, MR, Pig, Hive
    • Control Flow coordinates actions nodes
    • Workflows written in XML and has three control-flow nodes and one action node: a start control node, a map-reduce action node, a kill control node, and an end control node. 
    • All workflows must have one start and one end node.
    • Packaging, deploying and running oozie workflow job
    • Triggered by predicates usual time interval or date or external event 


Hadoop Definitive Guide Byte Sized Notes - 3

Chapter 4 Hadoop IO
 
Data Integrity in Hadoop
  • For data integrity usually CRC-32 is used but with low end hardware it could be checksum which is corrupt not the data
  • Datanodes are responsible for verifying the data they receive before storing the data and its checksum
  • Applies to data that they receive from clients and from other datanodes during replication
  • Each datanode keeps a persistent log of checksum verifications
LocalFileSystem
  • Hadoop LocalFileSystem performs client-side check summing
  • Creates hidden .filename.crc
  • To disable checksum use RawLocalFileSystem
  • FileSystem fs = new RawLocalFileSystem();
Compression
  • More space to store files, faster data transfer across network
  • Splittable shows whether the compression format supports splitting, that is, whether you can seek to any point in the stream and start reading from some point further on
  • Compression algo have space/time trade off
    • Gzip sits in middle
    • BZip compression better than Gzip but slow
    • LZO, LZ4,Snappy are faster
    • BZip is only splittable
Codecs
  • A codec is the implementation of a compression-decompression algorithm
  • We can compress file before writing, for reading a compressed file, CompressionCodecFactory is used to find file name extension and appropriate algo is used
Compression and Input Splits
  • Use container file format such as Sequence, RCFile or Avro
  • Use compression algo which support splitting
Using Compression in MapReduce
  • To compress the output of a MapReduce job, in the job configuration, set the mapred.output.compress property to true
  • We can compress output of a map
Serialization
  • Serialization process of converting objects to byte stream for transmitting over network, De-serialization is the opposite
  • In Hadoop, inter-process communication between nodes is implemented, using remote procedure calls(RPCs), RPC serialization should be compact, fast, extensible, and interoperable
  • Hadoop uses its own serialization format, Writables, Avro is used to overcome limitation of Writables interface
  • The Writable Interface 
            Interface for serialization
            IntWritable wrapper for Java int, use set() to assign value
            IntWritable writable = new IntWritable();
            writable.set(163);
            or 
            IntWritable writable = new IntWritable(163);
  • For writing we use DataOutput binary stream and for reading we use DataInput binary stream
  • DataOutput convert 163 into byte array, to de serialize the byte array is converted back to IntWritable
  • WritableComparable and comparators
    • Comparison of types is crucial for MapReduce
    • IntWritable implements the WritableComparable interface
    • This interface permits users to compare records read from a stream without de serializing them into objects (no overhead of object creation)
    • keys must be WritableComparable because they are passed to reducer in sorted order
    • There are wrappers for most data types in Java, boolean, byte, short, int, float, long, double, text is UTF-8 sequence
    • BytesWritable is a wrapper for an array of binary data
    • NullWritableis a special type of Writable, as it has a zero-length serialization. No bytes are written to or read from the stream
    • ObjectWritable is a general-purpose wrapper for the following: Java primitives, String, enum, Writable, null, or arrays of any of these type
  • Writable collections
    • There are six Writablecollection types, ex for 2D array use TwoDArrayWritable
    • We can create custom writable class and it can implements WritableComparable (Lab)
Serialization Frameworks
  • Hadoop has an API for pluggable serialization frameworks
Avro
  • Language-neutral data serialization system
Why Avro?
  • Written in JSON, encoded in binary
  • Language-independent schema has rich schema resolution capabilities
  • .avsc is the conventional extension for an Avro schema, .avro is data
  • Avro’s language interoperability
File-Based Data Structures
  • Special data structure to hold data
SequenceFile
  • Binary encoded key, value pairs
  • To save log files text is not suitable
  • Useful for storing binary key value pairs, .seq ext
  • SequenceFileInputFormat Binary file (k,v)
  • SequenceFileAsTextInputFormat (key.toString(), value.toString())
MapFile
  • Sorted SequenceFilewith an index to permit lookups by key
  • MapFile.Writer to write, .map ext
  • Numbers.map/index contains keys
  • Variation of mapfile
    • SetFile for storing Writable keys
    • ArrayFile int index
    • BloomMapFile fast get()
  • Fix() method is usually used for recreating corrupted indexes 
  • Sequence files can be converted to mapfiles 

Hadoop Definitive Guide Byte Sized Notes - 2

Chapter 3 HDFS
  • Hadoop can be integrated with local file system, S3 etc
  • Writes always happen at the end of the file, cannot append  data in between
  • Default block size 64 MB
Name Node
  • Name node holds the file system metadata, namespace and tree  in memory, without this metadata there is no way to access files in cluster 
  • Stores this info on local disk in 2 files name space image & edit log
  • NN keeps track of which blocks makes up a file
  • Block locations are dynamically updates
  • NN must be running at all times
  • When clients reads a file NN will not be a bottleneck
Secondary Name Node 
  • Stores the partial of name node’s namespace image and edit log
  • In case of name node failure we can merge namespace image and edit log 
HDFS Federation
  • Clusters can scale by adding name nodes, each NN manages a portion of file system namespace 
  • Namespace volumes are independent, Name Nodes don’t communicate with each other and doesn’t affect the availability of other name nodes
High Availability 
  • Pair of NN in active-stand by config
  • NN’s share use shared storage to share edit log
  • DN send block reports for both the NN
Fail Over
  • NN runs a process to monitor failures, if failover detected then it gracefully terminates
Fencing
  • Previously active name node is prevented from doing damage following will help prevent corruption of active name node
    • Disable the network port
    • Kill all process
    • Revoke access to shared storage
File System
  • Cannot execute a file in HDFS
  • Replication not applicable to directories 
  • Globbing wild cards to match multiple files in a directory, sample file selection: input/ncdc/all/190{1,2}.gz

Reading Data 
  • FileSystemAPI contains static factory methods to open i/p stream
  • A file in HDFS is path object
  • Default file system properties are stored in conf/core-site.xml  
    • FSDataInputStream
      • FileSystem uses FSDataInputStream object to open a file
      • positioned readable, seek() expensive
  • If FSDataInputStream fails then sends error to NN  and  NN sends the next closet block address
Writing Data
  • FSDataOutputStream main class
  • If FSDataOutputStream fails then pipeline is closed, partial blocks are deleted, bad data node removed from pipeline, NN replicates under replicated blocks
Replica Placement
  • 1st replica at random node, 2nd replica off rack to random node, 3rd replica same rack as 2nd but diff node, applicable to reducers output as well
Coherency 
  • Same data across all replicated data nodes, sync()
HAR Hadoop Archives
  • Typically used for small data sets
  • Packs all small blocks efficiently 
  • Archive tool
  • Limitations
    • No archive compression
    • Immutable to add or remove recreate HAR