public class KafkaTopo { public static void main(String[] args) { String zkRoot = "/kafka-storm"; String spoutId = "KafkaSpout"; BrokerHosts brokerHosts = new ZkHosts("m2:2181,m7:2181,m8:2181"); SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "test2", zkRoot, spoutId); // spoutConfig.forceFromStart = true; spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme()); TopologyBuilder builder = new TopologyBuilder(); //设置一个spout用来从kaflka消息队列中读取数据并发送给下一级的bolt组件,此处用的spout组件并非自定义的,而是storm中已经开发好的KafkaSpout builder.setSpout("KafkaSpout", new KafkaSpout(spoutConfig)); builder.setBolt("UpperBolt", new UpperBolt()).shuffleGrouping("KafkaSpout"); builder.setBolt("ExtBolt", new ExtBolt(), 4).fieldsGrouping("UpperBolt", new Fields("name")); Config conf = new Config(); conf.setNumWorkers(4); conf.setNumAckers(0); conf.setDebug(false); //LocalCluster用来将topology提交到本地模拟器运行,方便开发调试 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("WordCount", conf, builder.createTopology()); //提交topology到storm集群中运行// StormSubmitter.submitTopology("sufei-topo", conf, builder.createTopology()); } }