Step 1: Create a dummy table called customers in HBase, please refer this link on how to populate this table https://mapr.com/products/ mapr-sandbox-hadoop/tutorials/ tutorial-getting-started-with- hbase-shell/
hbase(main):004:0> scan '/user/user01/customer'
ROW COLUMN+CELL
amiller column=addr:state, timestamp=1497809527266, value=TX
jsmith column=addr:city, timestamp=1497809526053, value=denver
jsmith column=addr:state, timestamp=1497809526080, value=CO
jsmith column=order:date, timestamp=1497809490021, value=10-18-2014
jsmith column=order:numb, timestamp=1497809526118, value=6666
njones column=addr:city, timestamp=1497809525860, value=miami
njones column=addr:state, timestamp=1497809526151, value=TX
njones column=order:numb, timestamp=1497809525941, value=5555
tsimmons column=addr:city, timestamp=1497809525998, value=dallas
tsimmons column=addr:state, timestamp=1497809526023, value=TX
4 row(s) in 0.0310 seconds
Step 2: Next is reading this table in Spark, I used spark-shell to read the table and keyValueRDD is what we are looking for
[mapr@maprdemo ~]$ /opt/mapr/spark/spark-2.1.0/ bin/spark-shell
Spark context Web UI available at http://10.0.2.15:4040
Spark context available as 'sc' (master = local[*], app id = local-1497831718510).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0-mapr-1703
/_/
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
scala> import org.apache.spark._
import org.apache.spark._
scala> import org.apache.spark.rdd. NewHadoopRDD
import org.apache.spark.rdd. NewHadoopRDD
scala> import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor}
scala> import org.apache.hadoop.hbase. client.HBaseAdmin
import org.apache.hadoop.hbase. client.HBaseAdmin
scala> import org.apache.hadoop.hbase. mapreduce.TableInputFormat
import org.apache.hadoop.hbase. mapreduce.TableInputFormat
scala> import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Path
scala> import org.apache.hadoop.hbase. HColumnDescriptor
import org.apache.hadoop.hbase. HColumnDescriptor
scala> import org.apache.hadoop.hbase.util. Bytes
import org.apache.hadoop.hbase.util. Bytes
scala> import org.apache.hadoop.hbase. client.Put;
import org.apache.hadoop.hbase. client.Put
scala> import org.apache.hadoop.hbase. client.HTable;
import org.apache.hadoop.hbase. client.HTable
scala>
scala>
scala> val tableName="/user/user01/ customer"
tableName: String = /user/user01/customer
scala>
scala>
scala> val hconf = HBaseConfiguration.create()
hconf: org.apache.hadoop.conf. Configuration = Configuration: core-default.xml, org.apache.hadoop.conf. CoreDefaultProperties, core-site.xml, mapred-default.xml, org.apache.hadoop.mapreduce. conf. MapReduceDefaultProperties, mapred-site.xml, yarn-default.xml, org.apache.hadoop.yarn.conf. YarnDefaultProperties, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, hbase-default.xml, hbase-site.xml
scala>
scala> hconf.set(TableInputFormat. INPUT_TABLE, tableName)
scala>
scala>
scala>
scala> val admin = new HBaseAdmin(hconf)
warning: there was one deprecation warning; re-run with -deprecation for details
admin: org.apache.hadoop.hbase. client.HBaseAdmin = org.apache.hadoop.hbase. client.HBaseAdmin@2093bb6c
scala>
scala> val hBaseRDD = sc.newAPIHadoopRDD(hconf, classOf[TableInputFormat], classOf[org.apache.hadoop. hbase.io. ImmutableBytesWritable], classOf[org.apache.hadoop. hbase.client.Result])
hBaseRDD: org.apache.spark.rdd.RDD[(org. apache.hadoop.hbase.io. ImmutableBytesWritable, org.apache.hadoop.hbase. client.Result)] = NewHadoopRDD[0] at newAPIHadoopRDD at :38
scala> val result = hBaseRDD.count()
result: Long = 4
scala>
scala> val resultRDD = hBaseRDD.map(tuple => tuple._2)
resultRDD: org.apache.spark.rdd.RDD[org. apache.hadoop.hbase.client. Result] = MapPartitionsRDD[1] at map at :40
scala> resultRDD
res1: org.apache.spark.rdd.RDD[org. apache.hadoop.hbase.client. Result] = MapPartitionsRDD[1] at map at :40
scala> val keyValueRDD = resultRDD.map(result =>
| (Bytes.toString(result.getRow( )).split(" ")(0),
| Bytes.toString(result.value)))
keyValueRDD: org.apache.spark.rdd.RDD[( String, String)] = MapPartitionsRDD[2] at map at :44
scala> keyValueRDD.collect()
res2: Array[(String, String)] = Array((amiller,TX), (jsmith,denver), (njones,miami), (tsimmons,dallas))
Thanks for sharing informative info Spark with scala online training in hyderabad
ReplyDeleteval keyValueRDD = resultRDD.map(result =>
ReplyDelete| (Bytes.toString(result.getRow()).split(" ")(0),
| Bytes.toString(result.value)))
this code shows an error in my scala. could you please help on this
How to use Spark to read HBase data and convert it to DataFrame in the most efficient way
ReplyDeleteThanks
ReplyDelete