温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

如何解析spark-streaming中的socketTextStream

发布时间:2021-12-17 13:56:07 来源:亿速云 阅读:158 作者:柒染 栏目:大数据

本篇文章为大家展示了如何解析spark-streaming中的socketTextStream,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

package hgs.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.HashPartitioner
object SocketStreamingTest {
  def main(args: Array[String]): Unit = {
    
    val conf = new SparkConf();
    conf.setMaster("local[2]").setAppName("SocketStreaming")
    val context = new SparkContext(conf);
    //要添加spark-streaming的依赖包,spark的Seconds
    val streamContext  = new StreamingContext(context,Seconds(5));
    val ds = streamContext.socketTextStream("192.168.6.129", 8888, StorageLevel.MEMORY_ONLY);
    streamContext.checkpoint("d:\\chekpoint")
    //val ds2 = ds.flatMap(_.split(" ")).map((_,1)).reduceByKey((x,y)=>(x+y))//这种方式只是对该批次数据进行处理,并没有累计上一个批次
    
    
    //updateFunc: (Iterator[(K, Seq[V], Option[S])]) K:单词, Seq[V]该批次单词出现次数列表,Option:上一次计算的结果
    val updateFunc=(iter:Iterator[(String,Seq[Int],Option[Int])])=>{
      //iter.flatMap(it=>Some(it._2.sum+it._3.getOrElse(0)).map((it._1,_)))//方式一
      //iter.flatMap{case(x,y,z)=>{Some(y.sum+z.getOrElse(0)).map((x,_))}}//方式二
      iter.flatMap(it=>Some(it._1,(it._2.sum.toInt+it._3.getOrElse(0))))//方式三
    }
    val partitionner = new HashPartitioner(2)
    //通过updateStatByKey来进行累加
    val ds2 = ds.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc, partitionner, true)
    //打印
    ds2.print()
    streamContext.start()
    streamContext.awaitTermination()
  }
}

上述内容就是如何解析spark-streaming中的socketTextStream,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注亿速云行业资讯频道。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI