温馨提示×

HBase与Spark在CentOS上如何结合使用

小樊
32
2025-12-07 21:59:45
栏目: 智能运维

在 CentOS 上结合 Spark 与 HBase 的实用指南

一 环境准备与版本建议

  • 操作系统与基础组件:建议使用 CentOS 7/8JDK 8Hadoop 3.xSpark 3.xHBase 2.x。示例环境:CentOS 7 + Hadoop 3.2.0 + Spark 3.1.2 + HBase 2.2.7 + Scala 2.12 + JDK 8
  • 网络与连通性:确保 Spark 节点HBase/ZooKeeper 网络互通,必要时配置 /etc/hosts 或使用内网域名;云上场景需将客户端 IP 加入 白名单 并优先使用 VPC 内网 访问以降低时延。
  • 服务启动顺序建议:先启动 ZooKeeperHDFS(离开安全模式)→ HBase;如需可视化或 Thrift 访问,再启动 HBase Thrift(默认端口 9090,部分发行版为 54001)。

二 依赖与配置

  • 依赖管理(Maven 示例,适用于 Spark 3.x + HBase 2.2.x/2.4.x)
    • 核心依赖:hbase-client、hbase-common、hbase-server、hbase-mapreduce(版本与集群保持一致)。
    • 可选:若使用 Spark SQL 与 HBase 集成的高级封装,可引入 hadoop-hbase-spark(如 org.apache.hadoop.hbase.spark:hbase-spark:2.4.9)。
    • 示例依赖片段:
      <properties>
        <scala.version>2.12.18</scala.version>
        <spark.version>3.1.2</spark.version>
        <hbase.version>2.2.7</hbase.version>
      </properties>
      <dependencies>
        <dependency>
          <groupId>org.apache.hbase</groupId>
          <artifactId>hbase-client</artifactId>
          <version>${hbase.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.hbase</groupId>
          <artifactId>hbase-common</artifactId>
          <version>${hbase.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.hbase</groupId>
          <artifactId>hbase-server</artifactId>
          <version>${hbase.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.hbase</groupId>
          <artifactId>hbase-mapreduce</artifactId>
          <version>${hbase.version}</jobConf>
        </dependency>
        <!-- 可选:Spark SQL 与 HBase 集成 -->
        <dependency>
          <groupId>org.apache.hadoop.hbase.spark</groupId>
          <artifactId>hbase-spark</artifactId>
          <version>2.4.9</version>
        </dependency>
      </dependencies>
      
  • 客户端配置
    • 方式一:将 $HBASE_HOME/conf/hbase-site.xml 分发到 $SPARK_HOME/conf/,或在提交任务时用 –files 携带该文件。
    • 方式二:在代码中显式设置连接参数(推荐在集群外显式指定,避免误连 localhost)。
    • 常用关键项:
      • hbase.zookeeper.quorum(ZooKeeper 主机列表)
      • hbase.zookeeper.property.clientPort(默认 2181
      • 云上增强版需使用控制台提供的 Java API 访问地址(端口可能为 30020 等)。

三 读取与写入方式

  • 方式 A:HBaseContext + bulkPut(推荐,适合批量写入与复杂作业)
    • 适用场景:大规模批处理、需要多次读写 HBase 的作业。
    • 核心步骤:创建 HBaseConfiguration → 构建 HBaseContext → 构造 Put 集合 → 调用 bulkPut
    • 示例(Scala):
      import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
      import org.apache.hadoop.hbase.client.Put
      import org.apache.hadoop.hbase.util.Bytes
      import org.apache.hadoop.hbase.spark.HBaseContext
      import org.apache.spark.sql.SparkSession
      
      val spark = SparkSession.builder().appName("HBaseBulkPut").getOrCreate()
      val sc = spark.sparkContext
      
      val conf = HBaseConfiguration.create()
      conf.set("hbase.zookeeper.quorum", "zk1,zk2,zk3")
      conf.set("hbase.zookeeper.property.clientPort", "2181")
      
      val hbaseContext = new HBaseContext(sc, conf)
      val tableName = TableName.valueOf("my_table")
      val cf = "cf"
      
      val puts = (1 to 1000).map { i =>
        val put = new Put(Bytes.toBytes(s"row$i"))
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("qual"), Bytes.toBytes(s"val$i"))
        put
      }
      
      hbaseContext.bulkPut(tableName, puts)
      spark.stop()
      
  • 方式 B:newAPIHadoopRDD / saveAsNewAPIHadoopDataset(MapReduce 接口,适合简单 ETL)
    • 适用场景:一次性导入/导出、与既有 MR 作业兼容。
    • 示例(读取为 RDD[ImmutableBytesWritable, Result]):
      import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
      import org.apache.hadoop.hbase.client.Result
      import org.apache.hadoop.hbase.io.ImmutableBytesWritable
      import org.apache.hadoop.hbase.mapreduce.TableInputFormat
      import org.apache.hadoop.hbase.util.Bytes
      import org.apache.spark.sql.SparkSession
      
      val spark = SparkSession.builder().appName("HBaseMRRead").getOrCreate()
      val sc = spark.sparkContext
      
      val conf = HBaseConfiguration.create()
      conf.set("hbase.zookeeper.quorum", "zk1,zk2,zk3")
      conf.set(TableInputFormat.INPUT_TABLE, "my_table")
      
      val hbaseRDD = sc.newAPIHadoopRDD(
        conf,
        classOf[TableInputFormat],
        classOf[ImmutableBytesWritable],
        classOf[Result]
      )
      
      hbaseRDD.foreach { case (_, result) =>
        val v = Bytes.toString(result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("qual")))
        println(v)
      }
      spark.stop()
      
  • 方式 C:Spark SQL + HBase-Spark(DataFrame 读写,便于 SQL 分析)
    • 适用场景:需要以 DataFrame 进行 SQL 分析、与 Spark 生态无缝衔接。
    • 示例(读取):
      import org.apache.spark.sql.SparkSession
      import org.apache.hadoop.hbase.spark.HBaseContext
      import org.apache.hadoop.hbase.HBaseConfiguration
      
      val spark = SparkSession.builder()
        .appName("HBaseSparkSQLRead")
        .config("spark.sql.extensions", "org.apache.hadoop.hbase.spark.HBaseSparkSQL")
        .config("spark.sql.catalog.hbase", "org.apache.hadoop.hbase.spark.HBaseCatalog")
        .getOrCreate()
      
      val hbaseConf = HBaseConfiguration.create()
      hbaseConf.set("hbase.zookeeper.quorum", "zk1,zk2,zk3")
      hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
      
      // 注册 HBase 表为临时视图后使用 SQL
      spark.sql(
        s"""
           |CREATE TABLE hbase.${"my_table"} (
           |  rowkey STRING,
           |  cf ROW qual STRING
           |)
           |USING org.apache.hadoop.hbase.spark
           |OPTIONS (
           |  table '${"my_table"}',
           |  zookeeperQuorum '${"zk1,zk2,zk3:2181"}'
           |)
         """.stripMargin)
      
      spark.sql("SELECT * FROM hbase.my_table LIMIT 10").show()
      spark.stop()
      
  • 方式 D:Phoenix + Spark SQL(可选,适合二级索引与 SQL 友好场景)
    • 适用场景:需要 二级索引、复杂 SQL、JDBC/ODBC 访问。
    • 要点:在 Spark 中通过 Phoenix JDBCspark-hbase-connector 读写 Phoenix 表(底层映射 HBase 表)。

四 提交任务与常见问题

  • 提交命令示例(将 hbase-site.xml 随任务分发)
    spark-submit \
      --master yarn \
      --deploy-mode cluster \
      --class org.example.HBaseJob \
      --files /opt/hbase/conf/hbase-site.xml \
      --jars /opt/hbase/lib/hbase-client-2.2.7.jar,/opt/hbase/lib/hbase-common-2.2.7.jar,/opt/hbase/lib/hbase-server-2.2.7.jar \
      /opt/apps/hbase-spark-demo.jar
    
  • 常见问题与排查
    • 连接被拒绝或连到 localhost:未正确设置 hbase.zookeeper.quorum 或未分发 hbase-site.xml,导致默认连接 localhost:2181。请在代码或配置中显式指定 ZooKeeper 地址。
    • 缺少依赖导致反复重连或类冲突:确保 HBase 相关 JAR 在 classpath 中,且版本与集群一致;避免不同版本 guava 等库冲突。
    • 权限与网络:云上需将客户端 IP 加入 白名单;跨 VPC/公网访问需保证 同一地域/VPC 与相应端口放通。
    • 表不存在:建议先在 HBase Shell 创建表与列族,再让 Spark 写入/读取。
    • Thrift/可视化:如需通过 Thrift 访问 HBase(如 Python 客户端),需启动 HBase Thrift 服务(常见端口 9090/54001)。

0