温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Transactional topology怎么使用

发布时间:2021-12-22 17:20:20 来源:亿速云 阅读:135 作者:iii 栏目:云计算

本篇内容介绍了“Transactional topology怎么使用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

你可以通过使用TransactionalTopologyBuilder来创建transactional topology. 下面就是一个transactional topology的定义, 它的作用是计算输入流里面的tuple的个数。这段代码来自storm-starter里面的TransactionalGlobalCount。

MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);

TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);

builder.setBolt("partial-count", new BatchCount(), 5)

        .shuffleGrouping("spout");

builder.setBolt("sum", new UpdateGlobalCount())

        .globalGrouping("partial-count");

TransactionalTopologyBuilder构造器中接受如下的参数:

一个transaction topology的id

spout在整个topology里面的id。

一个transactional spout。

一个可选的这个transactional spout的并行度。

topology的id是用来在zookeeper里面保存这个topology的当前进度状态的,所以如果你重启这个topology, 它可以接着前面的进度继续执行。

一个transaction topology里面有一个唯一的TransactionalSpout, 这个spout是通过TransactionalTopologyBuilder的构造函数来指定的。在这个例子里面,MemoryTransactionalSpout被用来从一个内存变量里面读取数据(DATA)。第二个参数指定spout发送的tuple的字段, 第三个参数指定每个batch的最大tuple数量。关于如何自定义TransactionalSpout我们会在后面介绍。

现在说说 bolts。这个topology并行地计算tuple的总数量。第一个bolt:BatchBolt,随机地把输入tuple分给各个task,然后各个task各自统计局部数量。第二个bolt:UpdateGlobalCount, 用全局grouping来汇总这个batch中tuple的数量,然后再更新到数据库里面的全局数量。

下面是BatchCount的定义:

public static class BatchCount extends BaseBatchBolt {

    Object _id;

    BatchOutputCollector _collector;

    int _count = 0;

    @Override

    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {

        _collector = collector;

        _id = id;

    }

    @Override

    public void execute(Tuple tuple) {

        _count++;

    }

    @Override

    public void finishBatch() {

        _collector.emit(new Values(_id, _count));

    }

    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("id", "count"));

    }

}

storm会为每个正在处理的batch创建一个BatchCount对象,这个BatchCount是运行在BatchBoltExecutor里面的。而BatchBoltExecutor负责创建以及清理这个对象的实例。

BatchCount对象的prepare方法接收如下参数:

Storm config

Topology context

Output collector

这个batch的id (txid),在Transactional Topology中, 这个id则是一个TransactionAttempt对象。

这个batch bolt的抽象在DRPC里面也可以用, 只是txid的类型不一样而已。实际上,BatchBolt可以接收一个txid类型的参数,所以如果你只是想在transactioinal topology里面使用这个BatchBolt,你可以去继承BaseTransactionalBolt类,如下定义:

public abstract class BaseTransactionalBolt extends BaseBatchBolt<TransactionAttempt> {

}

在transaction topology里面发射的所有的tuple都必须以TransactionAttempt作为第一个field, 然后storm可以根据这个field来判断哪些tuple属于一个batch。所以你在发射tuple的时候需要满足这个条件。

TransactionAttempt包含两个值: 一个transaction id,一个attempt id。transaction id的作用就是我们上面介绍的对于每个batch是唯一的,而且不管这个batch 被replay多少次都是一样的。attempt id是对于每个batch唯一的一个id, 但是对于同一个batch,它replay之后的attempt id跟replay之前就不一样了, 我们可以把attempt id理解成replay-times, storm利用这个id来区别一个batch发射的tuple的不同版本。

transaction id对于每个batch加一, 所以第一个batch的transaction id是”1″, 第二个batch是”2″,依次类推。

每收到一个batch中的tuple,execute方法便被调用一次。每次当该方法被调用时,你应该把这个batch里面的状态保持在一个本地变量里面。对于这个例子来说, 它在execute方法里面递增tuple的个数。

最后, 当这个bolt接收到某个batch的所有的tuple之后, finishBatch方法会被调用。这个例子里面的BatchCount类会在这个时候发射它的局部数量到它的输出流里面去。

下面是UpdateGlobalCount类的定义:

public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {

    TransactionAttempt _attempt;

    BatchOutputCollector _collector;

    int _sum = 0;

    @Override

    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {

        _collector = collector;

        _attempt = attempt;

    }

    @Override

    public void execute(Tuple tuple) {

        _sum+=tuple.getInteger(1);

    }

    @Override

    public void finishBatch() {

        Value val = DATABASE.get(GLOBAL_COUNT_KEY);

        Value newval;

        if(val == null || !val.txid.equals(_attempt.getTransactionId())) {

            newval = new Value();

            newval.txid = _attempt.getTransactionId();

            if(val==null) {

                newval.count = _sum;

            } else {

                newval.count = _sum + val.count;

            }

            DATABASE.put(GLOBAL_COUNT_KEY, newval);

        } else {

            newval = val;

        }

        _collector.emit(new Values(_attempt, newval.count));

    }

    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("id", "sum"));

    }

}

UpdateGlobalCount是Transactional Topologies相关的类,所以它继承自BaseTransactionalBolt。在execute方法里面, UpdateGlobalCount累积这个batch的计数, 比较有趣的是finishBatch方法。

首先, 注意这个bolt实现了ICommitter接口,这告诉storm要在这个事务的commit阶段调用finishBatch方法,所以对于finishBatch的调用会保证强顺序性(顺序就是transaction id的升序),另一方面execute方法在processing或者commit阶段都可以执行。另外一种把bolt标识为commiter的方法是调用TransactionalTopologyBuilder的setCommiterBolt来添加Bolt(而不是setBolt)。

UpdateGlobalCount里面finishBatch方法的逻辑是首先从数据库中获取当前的值,并且把数据库里面的transaction id与当前这个batch的transaction id进行比较。如果他们一样, 那么忽略这个batch。否则把这个batch的结果加到总结果里面去,并且更新数据库。

“Transactional topology怎么使用”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI