温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Kafka+SparkStream+Hive的项目实现方法是什么

发布时间:2021-11-22 10:01:03 来源:亿速云 阅读:120 作者:iii 栏目:大数据

本篇内容主要讲解“Kafka+SparkStream+Hive的项目实现方法是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Kafka+SparkStream+Hive的项目实现方法是什么”吧!

目前的项目中需要将kafka队列的数据实时存到hive表中。

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.apache.spark.streaming.{Durations, Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
  def main(args: Array[String]): Unit = {
      //    val conf = new SparkConf()
      //    conf.setMaster("local")
      //    conf.setAppName("SparkStreamingOnKafkaDirect")
      val spark = SparkSession.builder().appName("test").master("local").enableHiveSupport().getOrCreate()
      val ssc = new StreamingContext(spark.sparkContext, Durations.seconds(3))
      //设置日志级别
      ssc.sparkContext.setLogLevel("Error")

      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "MyGroupId", //

        /**
         * 当没有初始的offset,或者当前的offset不存在,如何处理数据
         * earliest :自动重置偏移量为最小偏移量
         * latest:自动重置偏移量为最大偏移量【默认】
         * none:没有找到以前的offset,抛出异常
         */
        "auto.offset.reset" -> "earliest",

        /**
         * 当设置 enable.auto.commit为false时,不会自动向kafka中保存消费者offset.需要异步的处理完数据之后手动提交
         */
        "enable.auto.commit" -> (false: java.lang.Boolean) //默认是true
      )

      //设置Kafka的topic
      val topics = Array("test")
      //创建与Kafka的连接,接收数据
      /*这里接收到数据的样子
      2019-09-26  1569487411604   1235    497 Kafka   Register
      2019-09-26  1569487411604   1235    497 Kafka   Register
      2019-09-26  1569487414838   390    778  Flink   View
      */
      val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
        ssc,
        PreferConsistent, //
        Subscribe[String, String](topics, kafkaParams)
      )

      //对接收到的数据进行处理,打印出来接收到的key跟value,最后放回的是value
      val transStrem: DStream[String] = stream.map(record => {
        val key_value = (record.key, record.value)
        println("receive message key = " + key_value._1)
        println("receive message value = " + key_value._2)
        key_value._2
      })


      //这里用了一下动态创建的Schema
      val structType: StructType = StructType(List[StructField](
        StructField("Date_", StringType, nullable = true),
        StructField("Timestamp_", StringType, nullable = true),
        StructField("UserID", StringType, nullable = true),
        StructField("PageID", StringType, nullable = true),
        StructField("Channel", StringType, nullable = true),
        StructField("Action", StringType, nullable = true)
      ))

      //因为foreachRDD可以拿到封装到DStream中的rdd,可以对里面的rdd进行,
      /*代码解释:
          先从foreach中拿到一条数据,,在函数map中对接收来的数据用 “\n” 进行切分,放到Row中,用的是动态创建Schema,因为我们需要再将数据存储到hive中,所以需要Schema。
          因为map是transformance算子,所以用rdd.count()触发一下
           spark.createDataFrame:创建一个DataFrame,因为要注册一个临时表,必须用到DataFrame
           frame.createOrReplaceTempView("t1"):注册临时表
             spark.sql("use spark"):使用 hive 的 spark 库
           result.write.mode(SaveMode.Append).saveAsTable("test_kafka"):将数据放到 test_kafka 中
      */
      transStrem.foreachRDD(one => {
        val rdd: RDD[Row] = one.map({
          a =>
            val arr = a.toString.split("\t")
            Row(arr(0).toString, arr(1).toString, arr(2).toString, arr(3).toString, arr(4).toString, arr(5).toString)
        })
        rdd.count()
        val frame: DataFrame = spark.createDataFrame(rdd, structType)
        //      println(" Scheme: "+frame.printSchema())

        frame.createOrReplaceTempView("t1")
        //      spark.sql("select * from t1").show()
        spark.sql("use spark")
        spark.sql("select * from t1").
          write.mode(SaveMode.Append).saveAsTable("test_kafka")
      }
      )

      /**
       * 以上业务处理完成之后,异步的提交消费者offset,这里将 enable.auto.commit 设置成false,就是使用kafka 自己来管理消费者offset
       * 注意这里,获取 offsetRanges: Array[OffsetRange] 每一批次topic 中的offset时,必须从 源头读取过来的 stream中获取,不能从经过stream转换之后的DStream中获取。
       */
      stream.foreachRDD { rdd =>
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        // some time later, after outputs have completed
        stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      }
      ssc.start()
      ssc.awaitTermination()
      ssc.stop()
  }

到此,相信大家对“Kafka+SparkStream+Hive的项目实现方法是什么”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI