在 CentOS 上结合 Spark 与 HBase 的实用指南
一 环境准备与版本建议
二 依赖与配置
<properties>
<scala.version>2.12.18</scala.version>
<spark.version>3.1.2</spark.version>
<hbase.version>2.2.7</hbase.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>${hbase.version}</jobConf>
</dependency>
<!-- 可选:Spark SQL 与 HBase 集成 -->
<dependency>
<groupId>org.apache.hadoop.hbase.spark</groupId>
<artifactId>hbase-spark</artifactId>
<version>2.4.9</version>
</dependency>
</dependencies>
三 读取与写入方式
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("HBaseBulkPut").getOrCreate()
val sc = spark.sparkContext
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "zk1,zk2,zk3")
conf.set("hbase.zookeeper.property.clientPort", "2181")
val hbaseContext = new HBaseContext(sc, conf)
val tableName = TableName.valueOf("my_table")
val cf = "cf"
val puts = (1 to 1000).map { i =>
val put = new Put(Bytes.toBytes(s"row$i"))
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("qual"), Bytes.toBytes(s"val$i"))
put
}
hbaseContext.bulkPut(tableName, puts)
spark.stop()
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("HBaseMRRead").getOrCreate()
val sc = spark.sparkContext
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "zk1,zk2,zk3")
conf.set(TableInputFormat.INPUT_TABLE, "my_table")
val hbaseRDD = sc.newAPIHadoopRDD(
conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]
)
hbaseRDD.foreach { case (_, result) =>
val v = Bytes.toString(result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("qual")))
println(v)
}
spark.stop()
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.hadoop.hbase.HBaseConfiguration
val spark = SparkSession.builder()
.appName("HBaseSparkSQLRead")
.config("spark.sql.extensions", "org.apache.hadoop.hbase.spark.HBaseSparkSQL")
.config("spark.sql.catalog.hbase", "org.apache.hadoop.hbase.spark.HBaseCatalog")
.getOrCreate()
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", "zk1,zk2,zk3")
hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
// 注册 HBase 表为临时视图后使用 SQL
spark.sql(
s"""
|CREATE TABLE hbase.${"my_table"} (
| rowkey STRING,
| cf ROW qual STRING
|)
|USING org.apache.hadoop.hbase.spark
|OPTIONS (
| table '${"my_table"}',
| zookeeperQuorum '${"zk1,zk2,zk3:2181"}'
|)
""".stripMargin)
spark.sql("SELECT * FROM hbase.my_table LIMIT 10").show()
spark.stop()
四 提交任务与常见问题
spark-submit \
--master yarn \
--deploy-mode cluster \
--class org.example.HBaseJob \
--files /opt/hbase/conf/hbase-site.xml \
--jars /opt/hbase/lib/hbase-client-2.2.7.jar,/opt/hbase/lib/hbase-common-2.2.7.jar,/opt/hbase/lib/hbase-server-2.2.7.jar \
/opt/apps/hbase-spark-demo.jar