温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Storm的优点有哪些

发布时间:2021-12-16 15:04:04 来源:亿速云 阅读:224 作者:iii 栏目:云计算

本篇内容介绍了“Storm的优点有哪些”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

Storm和hadoop的区别

Storm的优点有哪些

  • 数据来源:HADOOP是HDFS上某个文件夹下的可能是成TB的数据,STORM是实时新增的某一笔数据

  • 处理过程:HADOOP是分MAP阶段到REDUCE阶段,STORM是由用户定义处理流程,流程中可以包含多个步骤,每个步骤可以是数据源(SPOUT)或处理逻辑(BOLT)

  • 是否结束:HADOOP最后是要结束的,STORM是没有结束状态,到最后一步时,就停在那,直到有新数据进入时再从头开始

  • 处理速度:HADOOP是以处理HDFS上大量数据为目的,速度慢,STORM是只要处理新增的某一笔数据即可可以做到很快。

  • 适用场景:HADOOP是在要处理一批数据时用的,不讲究时效性,要处理就提交一个JOB,STORM是要处理某一新增数据时用的,要讲时效性

  • 与MQ对比:HADOOP没有对比性,STORM可以看作是有N个步骤,每个步骤处理完就向下一个MQ发送消息,监听这个MQ的消费者继续处理

好的编程模型让开发者专注于业务逻辑;不好的编程模型让开发者把时间花费在通信,处理异常等琐事上.

编程模型例子:

  • 用hadoop的MapReduce和MPI做一个对比,在hadoop的MapReduce里面呢,它的编程模型里面呢,map和reduce,你只用去写map和reduce函数,以及一些简单的驱动,程序就能跑起来,你不用关心map和数据是怎么切分的,map和reduce是怎么传输的,reduce的数据是怎么写到hadoop的HDFS里面的,这些你都不用关心,看起来写mapreduce就是单机的代码,没有什么分布式的特点在里面啊,但是它运行的分布式框架来帮你做上述这些东西。

  • 反过来我们看写MPI的程序就完全不一样,写MPI的时候你就会很明显的感觉到你在写一个并行分布式程序,你需要在很多地方显式的去调数据传输的接口,你还要显式的去调一些数据同步的接口,这样才能把MPI程序显式的给RUN起来,这就是编程模型不同导致的不同的开发体验,其实这不仅仅是开发容易不容易的问题,更主要的是一个开发效率的问题,其实更简单的程序更能写出健壮的程序,写mapreduce程序是很简单的,但是要写出一个稳定靠谱的MPI程序就难一些

架构

  • Nimbus

  • Supervisor

  • Worker

编程模型:

  • DAG

  • Spout

  • Bolt

数据传输:

  • Zmq

    • Zmq也是开源的消息传递的框架,虽然叫mq,但它并不是一个message queue,而是一个封装的比较好的

  • Netty

    • netty是NIO的网络框架,效率比较高。之所以有netty是storm在apache之后呢,zmq的license和storm的license不兼容的,bolt处理完消息后会告诉Spout。

高可用性

  • 异常处理

  • 消息可靠性保证机制

可维护性:

  • Storm有个UI可以看跑在上面的程序监控

实时请求应答服务(同步),

  • 实时请求应答服务(同步),往往不是一个很简单的操作,而且大量的操作,用DAG模型来提高请求处理速度

  • DRPC

  • 实时请求处理

  • 例子:发送图片,或者图片地址,进行图片特征的提取

  • Storm的优点有哪些

  • 这里DRPC Server的好处是什么呢?这样看起来就像是一个Server,经过Spout,然后经过Bolt,不是更麻烦了吗?DRPC Server其实适用于分布式,可以应用分布式处理这个单个请求,来加速处理的过程。

    DRPCClientclient = new DRPCClient("drpc-host", 3772);
    String result = client.execute("reach","http://twitter.com");
    //服务端由四部分组成:包括一个DRPC Server,一个DPRC Spout,一个Topology和一个ReturnResult。


    Storm的优点有哪些

流式处理(异步)--- 不是说不快,而是不是等待结果

  • 逐条处理

    • 例子:ETL,把关心的数据提取,标准格式入库,它的特点是我把数据给你了,不用再返回给我,这个是异步的

  • 分析统计

    • 例子:日志PV,UV统计,访问热点统计,这类数据之间是有关联的,比如按某些字段做聚合,加和,平均等等

    • Storm的优点有哪些

  • 最后写到Redis,Hbase,MySQL,或者其他的MQ里面去给其他的系统去消费。

    /**
     * ShuffleGrouping("spout")就是从spout来订阅数据,fieldGrouping("split", new Fields("word"))
     * 实际上就是一个hash,同一个词有相同的hash,然后就会被hash到同一个WordCount的bolt里面,然后就
     * 可以进行计数。接下来两行呢是配置文件,然后是配置3个worker,接下来是通过Submitter提交Topology
     * 到Storm集群里面去。程序会编译打包,这段代码来自storm里面的starter的一段代码,这个代码怎么真正
     * 运行起来呢,就用storm jar 然后jar包的名,然后就是类的名字,和topology的名字,因为这里有个args[0]。
     * 这段代码很简单,首先呢,第一部分构造了一个DAG的有向无环图,然后生成配置,提交到Storm集群去。
     * */
    public static void main(String[] args) {
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("spout", new RandomSentenceSpout(), 5);
            builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
            builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
    
            Config conf = new Config();
            conf.setDebug(true);
            
            if(args != null && args.length > 0) {
                conf.setNumWorkers(3);
    
                StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
            }
        }
    
    ###Linux: storm jar storm-starter-0.9.4.jar storm.starter.WordCountTopology wordcount


     

Cluster Summary(整个集群的)

Storm的优点有哪些

  • 一个slot就是一个worker,一个worker里面是一个jvm,一个worker里面呢可以有多个executor,每一个executor就是执行线程,每一个executor上面执行一个或多个Task,一般来说默认是一个task。

  • Topology Summary(每个应用程序的)

  • 一个应用程序就是一个Topology,它有名字,还有ID,然后有个状态,ACTIVE就是正在运行,KILLED就是已经被杀掉了。

  • Storm的优点有哪些

  • Topology actions就是可以对Topology采取一些操作,Deactivate就是暂停,Rebalance就是重新做一下balance,然后kill就是杀掉这个应用。

  • 这个应用运行的到底怎么样呢,在Topology stats里面有个整体的统计,有10分钟,3小时,1天,还有所有的统计,这里面比较关键的呢,是Complete latency,它的意思就是一条数据从发出去到处理完花了多长时间,第二个比较关键的呢就是ACK,这个反映的是吞吐,前面的Complete latency反映的延迟。

  • 在Spouts的统计信息里面呢,一个是spout的名字,和代码里面是对应的,第二个呢是这个spout它有多少个executor,然后呢它有多少个task,然后呢是它在一定时间内往外emit出多少数据,真正tranfer传输了多少数据,然后它latency延迟是多少,然后ACK处理了多少数据,后面还有错误的信息。

  • Bolt也类似,通过这个UI页面可以实时观看这些统计信息,是非常有用的,可以知道哪个环节比较慢,哪些地方有没有什么瓶颈了,有瓶颈了是不是加一个并发来解决问题。

  • Spout中这里最关键的是一个nextTuple(),它是从外部取数据的源头,可以从DPRC取数据,可以从MQ,比如Kafka中取数据,然后给后面的bolt进行处理,然后这里wordcount没有那么复杂,就自己随机的生成了数据。

  • _collector.emit(new Values(sentence), new Object());

  • 这个代码后面new Object()等于是随机的生成了一个message的ID,这个ID有什么用,后面会讲到,实际上它是消息可靠性保障的一部分。有了这个ID呢Storm就可以帮你去跟踪这条消息到底有没有被处理完,如果处理完了呢?

  • 如果处理完了,它就是调用一个ack告诉spout,我已经处理完了,这里ack方法里面仅仅是把id打印出来,因为这里id没有什么意义,仅仅是为了展示,相反,如果在一定时间内没有处理完,会调用fail告诉说消息处理失败了。

    /**
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package storm.starter.spout;
    
    import backtype.storm.spout.SpoutOutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichSpout;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;
    import backtype.storm.utils.Utils;
    
    import java.util.Map;
    import java.util.Random;
    
    public class RandomSentenceSpout extends BaseRichSpout {
      SpoutOutputCollector _collector;
      Random _rand;
    
    
      @Override
      public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
        _rand = new Random();
      }
    
    
      @Override
      public void nextTuple() {
        Utils.sleep(100);
        String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
            "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
        String sentence = sentences[_rand.nextInt(sentences.length)];
        _collector.emit(new Values(sentence), new Object());
      }
    
      @Override
      public void ack(Object id) {
    	  System.out.println(id);
      }
    
      @Override
      public void fail(Object id) {
    	  System.out.println(id);
      }
    
      @Override
      public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
      }
      @Override
      public  Map<String, Object> getComponentConfiguration(){
        return null;
    
      }
    }
    
    public static  class WordCount1 extends BaseBasicBolt{
            Map<String, Integer> counts = new HashMap<String, Integer>();
            @Override
            public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
                String word = tuple.getStringByField("word");
                Integer count = counts.get(word);
                if(count==null){
                    count=0;
                }
                count++;
                counts.put(word,count);
                System.out.println("word++"+word+"========="+count);
                basicOutputCollector.emit(new Values(word,count));
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
                outputFieldsDeclarer.declare(new Fields("word","count"));
            }
        }


     

  • 对于wordcount的示例,它是有两个blot,一个bolt是分词,一个bolt是计数,这里SplitSentence是展示它支持多语言的开发,其实这里代码调用的是python的splitsentence.py,使用的是ShellBolt这个组件

  • 那wordcount这个bolt是用java实现的,它的实现核心是亮点,一点是有execute这样一个函数,第二个是declareOutputFields这个函数,这两个函数的作用其实是很什么呢?最核心的其实是execute,execute的作用呢就是拿到输入的数据Tuple,然后再emit数据出去。

  • 以上就是在storm里面一个最简单的wordcount的例子,它的主函数的代码,它的提交的命令行代码,Spout是什么样的,Bolt是什么样的,提交到Storm集群之后是一个什么样的运行状况,在WebUI上面看到哪些核心的信息,这个在后面的应用开发里面都会大量的运用到。

Storm与其他技术对比

  • Storm:进程、线程常驻运行,数据不进入磁盘,网络传递。

  • MapReduce:TB、PB级别数据设计的,一次的批处理作业。Storm的优点有哪些

  • Storm:纯流式处理,处理数据单元是一个个Tuple。另外Storm专门为流式处理设计,它的数据传输模式更为简单,很多地方也更为高效。并不是不能做批处理,它也可以来做微批处理,来提高吞吐。

  • Spark Streaming:微批处理,一个批处理怎么做流式处理呢,它基于内存和DAG可以把处理任务做的很快,把RDD做的很小来用小的批处理来接近流式处理。Storm的优点有哪些

  • 和其它如MPI系统相比Storm的优点有哪些

通过对比,更能了解Storm的一些特点:

  • 首先,相对于Queue+Worker来说,它是一个通用的分布式系统,分布式系统的一些细节可以屏蔽掉,比如说水平扩展,容错,上层应用只需要关注自己的业务逻辑就可以了,这一点对应应用开发人员来说是非常重要的,不然的话业务逻辑会被底层的一些细节所打乱。

  • 另外一个,Storm作为一个纯的流式处理系统,和mapreduce的差异相当大,一种称为流式处理,一种称为批处理,Storm是一个常驻运行的,它的消息收发是很高效的。

  • 和spark这种微批处理系统相比呢,Storm可以处理单条单条的消息。

  • 总的来说呢,Storm在设计之初呢,就被定义为分布式的流式处理系统,所以说大部分的流式计算需求都可以通过Storm很好的满足,Storm目前在稳定性方面也做的相当不错,对于实时流式计算来说是个非常不错的选择

“Storm的优点有哪些”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI