Step 1: Create a dummy table called customers in HBase, please refer this link on how to populate this table https://mapr.com/products/ mapr-sandbox-hadoop/tutorials/ tutorial-getting-started-with- hbase-shell/
hbase(main):004:0> scan '/user/user01/customer'
ROW COLUMN+CELL
amiller column=addr:state, timestamp=1497809527266, value=TX
jsmith column=addr:city, timestamp=1497809526053, value=denver
jsmith column=addr:state, timestamp=1497809526080, value=CO
jsmith column=order:date, timestamp=1497809490021, value=10-18-2014
jsmith column=order:numb, timestamp=1497809526118, value=6666
njones column=addr:city, timestamp=1497809525860, value=miami
njones column=addr:state, timestamp=1497809526151, value=TX
njones column=order:numb, timestamp=1497809525941, value=5555
tsimmons column=addr:city, timestamp=1497809525998, value=dallas
tsimmons column=addr:state, timestamp=1497809526023, value=TX
4 row(s) in 0.0310 seconds
Step 2: Next is reading this table in Spark, I used spark-shell to read the table and keyValueRDD is what we are looking for
[mapr@maprdemo ~]$ /opt/mapr/spark/spark-2.1.0/ bin/spark-shell
Spark context Web UI available at http://10.0.2.15:4040
Spark context available as 'sc' (master = local[*], app id = local-1497831718510).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0-mapr-1703
/_/
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
scala> import org.apache.spark._
import org.apache.spark._
scala> import org.apache.spark.rdd. NewHadoopRDD
import org.apache.spark.rdd. NewHadoopRDD
scala> import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor}
scala> import org.apache.hadoop.hbase. client.HBaseAdmin
import org.apache.hadoop.hbase. client.HBaseAdmin
scala> import org.apache.hadoop.hbase. mapreduce.TableInputFormat
import org.apache.hadoop.hbase. mapreduce.TableInputFormat
scala> import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Path
scala> import org.apache.hadoop.hbase. HColumnDescriptor
import org.apache.hadoop.hbase. HColumnDescriptor
scala> import org.apache.hadoop.hbase.util. Bytes
import org.apache.hadoop.hbase.util. Bytes
scala> import org.apache.hadoop.hbase. client.Put;
import org.apache.hadoop.hbase. client.Put
scala> import org.apache.hadoop.hbase. client.HTable;
import org.apache.hadoop.hbase. client.HTable
scala>
scala>
scala> val tableName="/user/user01/ customer"
tableName: String = /user/user01/customer
scala>
scala>
scala> val hconf = HBaseConfiguration.create()
hconf: org.apache.hadoop.conf. Configuration = Configuration: core-default.xml, org.apache.hadoop.conf. CoreDefaultProperties, core-site.xml, mapred-default.xml, org.apache.hadoop.mapreduce. conf. MapReduceDefaultProperties, mapred-site.xml, yarn-default.xml, org.apache.hadoop.yarn.conf. YarnDefaultProperties, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, hbase-default.xml, hbase-site.xml
scala>
scala> hconf.set(TableInputFormat. INPUT_TABLE, tableName)
scala>
scala>
scala>
scala> val admin = new HBaseAdmin(hconf)
warning: there was one deprecation warning; re-run with -deprecation for details
admin: org.apache.hadoop.hbase. client.HBaseAdmin = org.apache.hadoop.hbase. client.HBaseAdmin@2093bb6c
scala>
scala> val hBaseRDD = sc.newAPIHadoopRDD(hconf, classOf[TableInputFormat], classOf[org.apache.hadoop. hbase.io. ImmutableBytesWritable], classOf[org.apache.hadoop. hbase.client.Result])
hBaseRDD: org.apache.spark.rdd.RDD[(org. apache.hadoop.hbase.io. ImmutableBytesWritable, org.apache.hadoop.hbase. client.Result)] = NewHadoopRDD[0] at newAPIHadoopRDD at :38
scala> val result = hBaseRDD.count()
result: Long = 4
scala>
scala> val resultRDD = hBaseRDD.map(tuple => tuple._2)
resultRDD: org.apache.spark.rdd.RDD[org. apache.hadoop.hbase.client. Result] = MapPartitionsRDD[1] at map at :40
scala> resultRDD
res1: org.apache.spark.rdd.RDD[org. apache.hadoop.hbase.client. Result] = MapPartitionsRDD[1] at map at :40
scala> val keyValueRDD = resultRDD.map(result =>
| (Bytes.toString(result.getRow( )).split(" ")(0),
| Bytes.toString(result.value)))
keyValueRDD: org.apache.spark.rdd.RDD[( String, String)] = MapPartitionsRDD[2] at map at :44
scala> keyValueRDD.collect()
res2: Array[(String, String)] = Array((amiller,TX), (jsmith,denver), (njones,miami), (tsimmons,dallas))