博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Storm starter - Overview
阅读量:5154 次
发布时间:2019-06-13

本文共 6050 字,大约阅读时间需要 20 分钟。

Storm的starter例子, 都给的很有诚意, 不光是例子, 而是可以直接使用在实际的场景里面.

并且提高一些很有用的tool, 比如SlidingWindowCounter, TimeCacheMap
所以starter可以说是提高了基于storm编程的框架, 值得认真研究一下...

 

ExclamationTopology, 基本的Topology

没有什么特别的地方, 标准的例子

/** * This is a basic example of a Storm topology. */public class ExclamationTopology {        public static class ExclamationBolt extends BaseRichBolt {        OutputCollector _collector;        @Override        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {            _collector = collector;        }        @Override        public void execute(Tuple tuple) {            _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));            _collector.ack(tuple);        }        @Override        public void declareOutputFields(OutputFieldsDeclarer declarer) {            declarer.declare(new Fields("word"));        }    }        public static void main(String[] args) throws Exception {        TopologyBuilder builder = new TopologyBuilder();                builder.setSpout("word", new TestWordSpout(), 10);                builder.setBolt("exclaim1", new ExclamationBolt(), 3)                .shuffleGrouping("word");        builder.setBolt("exclaim2", new ExclamationBolt(), 2)                .shuffleGrouping("exclaim1");                        Config conf = new Config();        conf.setDebug(true);                if(args!=null && args.length > 0) {            conf.setNumWorkers(3);                        StormSubmitter.submitTopology(args[0], conf, builder.createTopology());        } else {                    LocalCluster cluster = new LocalCluster();            cluster.submitTopology("test", conf, builder.createTopology());            Utils.sleep(10000);            cluster.killTopology("test");            cluster.shutdown();            }    }}

 

RollingTopWords

实现了TopN和滑动窗口功能

这个例子的Bolt实现的很有指导意义,

 

SingleJoinExample

通过TimeCacheMap, 实现基于memory的join,

 

BasicDRPCTopology, ReachTopology

关于DRPC的例子, 参考

 

TransactionalGlobalCount, TransactionalWords

Transactional Topology,

TransactionalGlobalCount比较简单, 看看TransactionalWords

在对word计数的基础上, 加上word count分布统计信息

public static Map
COUNT_DATABASE = new HashMap
();public static Map
BUCKET_DATABASE = new HashMap
();
使用Count_Database来记录word的计数
使用Bucket_Database来记录word计数的分布, 比如, 出现0~9次的word有多少, 10~20的word有多少
 
public static class KeyedCountUpdater extends BaseTransactionalBolt implements ICommitter

对于KeyedCountUpdater和前面的简单例子没有啥大区别, 在execute时对word进行count, 在finishBatch时, 直接commit到Count_Database

输出, new Fields("id", "key", "count", "prev-count"), 其他都好理解, 为啥需要prev-count? 因为在更新Bucket_Database, 需要知道该word的bucket是否发生迁移, 所以必须知道之前的count

 

Bucketize, 根据count/BUCKET_SIZE, 算出应该属于哪个bucket

如果新的word, 直接在某bucket +1
如果word的bucket发生变化, 在新的bucket +1, 旧的bucket –1
如果没有变化, 不需要输出

public static class Bucketize extends BaseBasicBolt {        @Override        public void execute(Tuple tuple, BasicOutputCollector collector) {            TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);            int curr = tuple.getInteger(2);            Integer prev = tuple.getInteger(3);            int currBucket = curr / BUCKET_SIZE;            Integer prevBucket = null;            if(prev!=null) {                prevBucket = prev / BUCKET_SIZE;            }                        if(prevBucket==null) {                collector.emit(new Values(attempt, currBucket, 1));                            } else if(currBucket != prevBucket) {                collector.emit(new Values(attempt, currBucket, 1));                collector.emit(new Values(attempt, prevBucket, -1));            }        }                @Override        public void declareOutputFields(OutputFieldsDeclarer declarer) {            declarer.declare(new Fields("attempt", "bucket", "delta"));        }    }
BucketCountUpdater, 也就是将上面的bucket的更新, 更新到Bucket_Database

Topology定义如下,

MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("top-n-words", "spout", spout, 2);builder.setBolt("count", new KeyedCountUpdater(), 5)        .fieldsGrouping("spout", new Fields("word"));builder.setBolt("bucketize", new Bucketize())        .noneGrouping("count");builder.setBolt("buckets", new BucketCountUpdater(), 5)        .fieldsGrouping("bucketize", new Fields("bucket"));

 

WordCountTopology, 多语言的支持

分别使用ShellBolt和BaseBasicBolt来声明使用python和Java实现的Blot

public static class SplitSentence extends ShellBolt implements IRichBolt {                public SplitSentence() {            super("python", "splitsentence.py");        }        @Override        public void declareOutputFields(OutputFieldsDeclarer declarer) {            declarer.declare(new Fields("word"));        }        @Override        public Map
getComponentConfiguration() { return null; } } public static class WordCount extends BaseBasicBolt { Map
counts = new HashMap
(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if(count==null) count = 0; count++; counts.put(word, count); collector.emit(new Values(word, count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }

在定义Topology的时候, 可以直接将ShellBolt和BaseBasicBolt混合使用, 非常方便

TopologyBuilder builder = new TopologyBuilder();                builder.setSpout("spout", new RandomSentenceSpout(), 5);                builder.setBolt("split", new SplitSentence(), 8)                 .shuffleGrouping("spout");        builder.setBolt("count", new WordCount(), 12)                 .fieldsGrouping("split", new Fields("word"));

转载于:https://www.cnblogs.com/fxjwind/archive/2013/05/24/3097417.html

你可能感兴趣的文章
Python数据可视化之Pygal(雷达图)
查看>>
Java学习笔记--字符串和文件IO
查看>>
转 Silverlight开发历程—(画刷与着色之线性渐变画刷)
查看>>
在js在添版本号
查看>>
sublime3
查看>>
Exception Type: IntegrityError 数据完整性错误
查看>>
Nuget:Newtonsoft.Json
查看>>
Hdu - 1002 - A + B Problem II
查看>>
Android设置Gmail邮箱
查看>>
js编写时间选择框
查看>>
JIRA
查看>>
小技巧——直接在目录中输入cmd然后就打开cmd命令窗口
查看>>
深浅拷贝(十四)
查看>>
HDU 6370(并查集)
查看>>
BZOJ 1207(dp)
查看>>
HDU 2076 夹角有多大(题目已修改,注意读题)
查看>>
洛谷P3676 小清新数据结构题(动态点分治)
查看>>
九校联考-DL24凉心模拟Day2T1 锻造(forging)
查看>>
Attributes.Add用途与用法
查看>>
L2-001 紧急救援 (dijkstra+dfs回溯路径)
查看>>