Sunday, June 18, 2017

How to read HBase table from Scala Spark


Step 1: Create a dummy table called customers in HBase, please refer this link on how to populate this table https://mapr.com/products/mapr-sandbox-hadoop/tutorials/tutorial-getting-started-with-hbase-shell/


hbase(main):004:0> scan '/user/user01/customer'
ROW                                  COLUMN+CELL                                                                                              
 amiller                             column=addr:state, timestamp=1497809527266, value=TX                                                     
 jsmith                              column=addr:city, timestamp=1497809526053, value=denver                                                  
 jsmith                              column=addr:state, timestamp=1497809526080, value=CO                                                     
 jsmith                              column=order:date, timestamp=1497809490021, value=10-18-2014                                             
 jsmith                              column=order:numb, timestamp=1497809526118, value=6666                                                   
 njones                              column=addr:city, timestamp=1497809525860, value=miami                                                   
 njones                              column=addr:state, timestamp=1497809526151, value=TX                                                     
 njones                              column=order:numb, timestamp=1497809525941, value=5555                                                   
 tsimmons                            column=addr:city, timestamp=1497809525998, value=dallas                                                  
 tsimmons                            column=addr:state, timestamp=1497809526023, value=TX                                                     
4 row(s) in 0.0310 seconds

Step 2:  Next is reading this table in Spark, I used spark-shell to read the table and keyValueRDD is what we are looking for 

[mapr@maprdemo ~]$ /opt/mapr/spark/spark-2.1.0/bin/spark-shell 
Spark context Web UI available at http://10.0.2.15:4040
Spark context available as 'sc' (master = local[*], app id = local-1497831718510).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0-mapr-1703
      /_/
         
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.

scala> 

scala> import org.apache.spark._
import org.apache.spark._

scala> import org.apache.spark.rdd.NewHadoopRDD
import org.apache.spark.rdd.NewHadoopRDD

scala> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}

scala> import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.client.HBaseAdmin

scala> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

scala> import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Path

scala> import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.HColumnDescriptor

scala> import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.util.Bytes

scala> import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Put

scala> import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTable

scala> 

scala> 

scala> val tableName="/user/user01/customer"
tableName: String = /user/user01/customer

scala> 

scala> 

scala> val hconf = HBaseConfiguration.create()
hconf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, org.apache.hadoop.conf.CoreDefaultProperties, core-site.xml, mapred-default.xml, org.apache.hadoop.mapreduce.conf.MapReduceDefaultProperties, mapred-site.xml, yarn-default.xml, org.apache.hadoop.yarn.conf.YarnDefaultProperties, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, hbase-default.xml, hbase-site.xml

scala> 

scala> hconf.set(TableInputFormat.INPUT_TABLE, tableName)

scala> 

scala> 

scala> 

scala> val admin = new HBaseAdmin(hconf)
warning: there was one deprecation warning; re-run with -deprecation for details
admin: org.apache.hadoop.hbase.client.HBaseAdmin = org.apache.hadoop.hbase.client.HBaseAdmin@2093bb6c

scala> 

scala> val hBaseRDD = sc.newAPIHadoopRDD(hconf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
hBaseRDD: org.apache.spark.rdd.RDD[(org.apache.hadoop.hbase.io.ImmutableBytesWritable, org.apache.hadoop.hbase.client.Result)] = NewHadoopRDD[0] at newAPIHadoopRDD at :38

scala> val result = hBaseRDD.count()
result: Long = 4

scala> 

scala> val resultRDD = hBaseRDD.map(tuple => tuple._2) 
resultRDD: org.apache.spark.rdd.RDD[org.apache.hadoop.hbase.client.Result] = MapPartitionsRDD[1] at map at :40

scala> resultRDD
res1: org.apache.spark.rdd.RDD[org.apache.hadoop.hbase.client.Result] = MapPartitionsRDD[1] at map at :40

scala> val keyValueRDD = resultRDD.map(result =>
     | (Bytes.toString(result.getRow()).split(" ")(0),
     | Bytes.toString(result.value)))
keyValueRDD: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[2] at map at :44

scala> keyValueRDD.collect()
res2: Array[(String, String)] = Array((amiller,TX), (jsmith,denver), (njones,miami), (tsimmons,dallas))