0 votes
1 view
in Big Data Hadoop & Spark by (11.5k points)

What is the right way to load the data from hbase then perform manipulation,then convert to JavaRDD.

1 Answer

0 votes
by (31.4k points)

TableInputFormat class is in hbase-server.jar and you would need to add that dependency in your pom.xml file. Please check HBase and non-existent TableInputFormat at Spark user list.

<dependency>

    <groupId>org.apache.hbase</groupId>

    <artifactId>hbase-server</artifactId>

    <version>1.3.0</version>

</dependency>

And below are sample codes(in Java and scala) to read from Hbase using Spark.

In Java:

public static void main(String[] args) throws Exception {

    SparkConf sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local[*]");

    JavaSparkContext jsc = new JavaSparkContext(sparkConf);

    Configuration hbaseConf = HBaseConfiguration.create();

    hbaseConf.set(TableInputFormat.INPUT_TABLE, "my_table");

    JavaPairRDD<ImmutableBytesWritable, Result> javaPairRdd = jsc.newAPIHadoopRDD(hbaseConf, TableInputFormat.class,ImmutableBytesWritable.class, Result.class);

    jsc.stop();

  }

}

In Scala :

import org.apache.hadoop.hbase.client.{HBaseAdmin, Result}

import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor }

import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.spark._

object HBaseRead {

  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local[2]")

    val sc = new SparkContext(sparkConf)

    val conf = HBaseConfiguration.create()

    val tableName = "table1"

    System.setProperty("user.name", "hdfs")

    System.setProperty("HADOOP_USER_NAME", "hdfs")

    conf.set("hbase.master", "localhost:60000")

    conf.setInt("timeout", 120000)

    conf.set("hbase.zookeeper.quorum", "localhost")

    conf.set("zookeeper.znode.parent", "/hbase-unsecure")

    conf.set(TableInputFormat.INPUT_TABLE, tableName)

    val admin = new HBaseAdmin(conf)

    if (!admin.isTableAvailable(tableName)) {

      val tableDesc = new HTableDescriptor(tableName)

      admin.createTable(tableDesc)

    }

    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])

    println("Number of Records found : " + hBaseRDD.count())

    sc.stop()

  }

}

Related questions

Welcome to Intellipaat Community. Get your technical queries answered by top developers !


Categories

...