博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
storm kafka整合
阅读量:4343 次
发布时间:2019-06-07

本文共 1183 字,大约阅读时间需要 3 分钟。

public class KafkaTopo {		public static void main(String[] args) {		String zkRoot = "/kafka-storm";		String spoutId = "KafkaSpout";		BrokerHosts brokerHosts = new ZkHosts("m2:2181,m7:2181,m8:2181"); 		SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "test2", zkRoot, spoutId);		// spoutConfig.forceFromStart = true;		spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());		TopologyBuilder builder = new TopologyBuilder();		//设置一个spout用来从kaflka消息队列中读取数据并发送给下一级的bolt组件,此处用的spout组件并非自定义的,而是storm中已经开发好的KafkaSpout		builder.setSpout("KafkaSpout", new KafkaSpout(spoutConfig));		builder.setBolt("UpperBolt", new UpperBolt()).shuffleGrouping("KafkaSpout");		builder.setBolt("ExtBolt", new ExtBolt(), 4).fieldsGrouping("UpperBolt", new Fields("name"));		Config conf = new Config();		conf.setNumWorkers(4);		conf.setNumAckers(0);		conf.setDebug(false);				//LocalCluster用来将topology提交到本地模拟器运行,方便开发调试		LocalCluster cluster = new LocalCluster();		cluster.submitTopology("WordCount", conf, builder.createTopology());				//提交topology到storm集群中运行//		StormSubmitter.submitTopology("sufei-topo", conf, builder.createTopology());	}	}

  

转载于:https://www.cnblogs.com/heml/p/6074960.html

你可能感兴趣的文章
MySQL创建计算字段
查看>>
Software--Project--Alpha_20180108
查看>>
评论列表显示及排序,个人中心显示
查看>>
Linux内核分析——计算机是如何工作的
查看>>
php调用远程url的六种方法
查看>>
for嵌套
查看>>
Name control
查看>>
解决VS2015安装Android SDK 后文件不全及更新问题
查看>>
辣鸡咯..
查看>>
(2018干货系列一)最新Java学习路线整合
查看>>
django 快速搭建blog
查看>>
Chrome插件:本地程序实现验证码破解(浏览器与本地进程通信)
查看>>
学习的态度!
查看>>
小组成员名单()
查看>>
[Javascirpt] What’s new in JavaScript (Google I/O ’19)
查看>>
[Angular 2] Writing a Simple Angular 2 Component
查看>>
可能会用的到的JQ插件
查看>>
高斯消元模板
查看>>
【GPS】SAP测试GPS模块拿不到sensor数据
查看>>
python 数据类型之列表、元组、字典、集合
查看>>