Sunday, June 18, 2017

How to read HBase table from Scala Spark


Step 1: Create a dummy table called customers in HBase, please refer this link on how to populate this table https://mapr.com/products/mapr-sandbox-hadoop/tutorials/tutorial-getting-started-with-hbase-shell/


hbase(main):004:0> scan '/user/user01/customer'
ROW                                  COLUMN+CELL                                                                                              
 amiller                             column=addr:state, timestamp=1497809527266, value=TX                                                     
 jsmith                              column=addr:city, timestamp=1497809526053, value=denver                                                  
 jsmith                              column=addr:state, timestamp=1497809526080, value=CO                                                     
 jsmith                              column=order:date, timestamp=1497809490021, value=10-18-2014                                             
 jsmith                              column=order:numb, timestamp=1497809526118, value=6666                                                   
 njones                              column=addr:city, timestamp=1497809525860, value=miami                                                   
 njones                              column=addr:state, timestamp=1497809526151, value=TX                                                     
 njones                              column=order:numb, timestamp=1497809525941, value=5555                                                   
 tsimmons                            column=addr:city, timestamp=1497809525998, value=dallas                                                  
 tsimmons                            column=addr:state, timestamp=1497809526023, value=TX                                                     
4 row(s) in 0.0310 seconds

Step 2:  Next is reading this table in Spark, I used spark-shell to read the table and keyValueRDD is what we are looking for 

[mapr@maprdemo ~]$ /opt/mapr/spark/spark-2.1.0/bin/spark-shell 
Spark context Web UI available at http://10.0.2.15:4040
Spark context available as 'sc' (master = local[*], app id = local-1497831718510).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0-mapr-1703
      /_/
         
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.

scala> 

scala> import org.apache.spark._
import org.apache.spark._

scala> import org.apache.spark.rdd.NewHadoopRDD
import org.apache.spark.rdd.NewHadoopRDD

scala> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}

scala> import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.client.HBaseAdmin

scala> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

scala> import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Path

scala> import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.HColumnDescriptor

scala> import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.util.Bytes

scala> import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Put

scala> import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTable

scala> 

scala> 

scala> val tableName="/user/user01/customer"
tableName: String = /user/user01/customer

scala> 

scala> 

scala> val hconf = HBaseConfiguration.create()
hconf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, org.apache.hadoop.conf.CoreDefaultProperties, core-site.xml, mapred-default.xml, org.apache.hadoop.mapreduce.conf.MapReduceDefaultProperties, mapred-site.xml, yarn-default.xml, org.apache.hadoop.yarn.conf.YarnDefaultProperties, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, hbase-default.xml, hbase-site.xml

scala> 

scala> hconf.set(TableInputFormat.INPUT_TABLE, tableName)

scala> 

scala> 

scala> 

scala> val admin = new HBaseAdmin(hconf)
warning: there was one deprecation warning; re-run with -deprecation for details
admin: org.apache.hadoop.hbase.client.HBaseAdmin = org.apache.hadoop.hbase.client.HBaseAdmin@2093bb6c

scala> 

scala> val hBaseRDD = sc.newAPIHadoopRDD(hconf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
hBaseRDD: org.apache.spark.rdd.RDD[(org.apache.hadoop.hbase.io.ImmutableBytesWritable, org.apache.hadoop.hbase.client.Result)] = NewHadoopRDD[0] at newAPIHadoopRDD at :38

scala> val result = hBaseRDD.count()
result: Long = 4

scala> 

scala> val resultRDD = hBaseRDD.map(tuple => tuple._2) 
resultRDD: org.apache.spark.rdd.RDD[org.apache.hadoop.hbase.client.Result] = MapPartitionsRDD[1] at map at :40

scala> resultRDD
res1: org.apache.spark.rdd.RDD[org.apache.hadoop.hbase.client.Result] = MapPartitionsRDD[1] at map at :40

scala> val keyValueRDD = resultRDD.map(result =>
     | (Bytes.toString(result.getRow()).split(" ")(0),
     | Bytes.toString(result.value)))
keyValueRDD: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[2] at map at :44

scala> keyValueRDD.collect()
res2: Array[(String, String)] = Array((amiller,TX), (jsmith,denver), (njones,miami), (tsimmons,dallas))

4 comments:

  1. val keyValueRDD = resultRDD.map(result =>
    | (Bytes.toString(result.getRow()).split(" ")(0),
    | Bytes.toString(result.value)))

    this code shows an error in my scala. could you please help on this

    ReplyDelete
  2. How to use Spark to read HBase data and convert it to DataFrame in the most efficient way

    ReplyDelete