kafka+storm+hbase详解编程语言

kafka+storm+hbase实现计算WordCount。

(1)表名:wc

(2)列族:result

(3)RowKey:word

(4)Field:count

 

1、解决:

1)第一步:首先准备kafkastormhbase相关jar包。依赖如下

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
  <modelVersion>4.0.0</modelVersion> 
  <groupId>com</groupId> 
  <artifactId>kafkaSpout</artifactId> 
  <version>0.0.1-SNAPSHOT</version> 
   
	<dependencies> 
        <dependency> 
            <groupId>org.apache.storm</groupId> 
            <artifactId>storm-core</artifactId> 
            <version>0.9.3</version> 
        </dependency> 
        <dependency> 
            <groupId>org.apache.storm</groupId> 
            <artifactId>storm-kafka</artifactId> 
            <version>0.9.3</version> 
        </dependency> 
        <dependency> 
            <groupId>org.apache.kafka</groupId> 
            <artifactId>kafka_2.10</artifactId> 
            <version>0.8.1.1</version> 
            <exclusions> 
                <exclusion> 
                    <groupId>org.apache.zookeeper</groupId> 
                    <artifactId>zookeeper</artifactId> 
                </exclusion> 
                <exclusion> 
                    <groupId>log4j</groupId> 
                    <artifactId>log4j</artifactId> 
                </exclusion> 
            </exclusions> 
        </dependency> 
        <dependency> 
            <groupId>org.apache.hbase</groupId> 
            <artifactId>hbase-client</artifactId> 
            <version>0.99.2</version> 
            <exclusions> 
                <exclusion> 
                    <groupId>org.slf4j</groupId> 
                    <artifactId>slf4j-log4j12</artifactId> 
                </exclusion> 
                <exclusion> 
                    <groupId>org.apache.zookeeper</groupId> 
                    <artifactId>zookeeper</artifactId> 
                </exclusion> 
            </exclusions> 
        </dependency> 
         
       <dependency> 
 
         <groupId>com.google.protobuf</groupId> 
 
         <artifactId>protobuf-java</artifactId> 
 
         <version>2.5.0</version> 
 
        </dependency> 
 
        <dependency> 
            <groupId>org.apache.curator</groupId> 
            <artifactId>curator-framework</artifactId> 
            <version>2.5.0</version> 
            <exclusions> 
                <exclusion> 
                    <groupId>log4j</groupId> 
                    <artifactId>log4j</artifactId> 
                </exclusion> 
                <exclusion> 
                    <groupId>org.slf4j</groupId> 
                    <artifactId>slf4j-log4j12</artifactId> 
                </exclusion> 
            </exclusions> 
        </dependency> 
 
 
 
			 
			 
			 
			 
			  
         
           <dependency> 
			<groupId>jdk.tools</groupId> 
			<artifactId>jdk.tools</artifactId> 
			<version>1.7</version> 
			<scope>system</scope> 
			<systemPath>C:/Program Files/Java/jdk1.7.0_51/lib/tools.jar</systemPath> 
		</dependency>      
         
    </dependencies> 
 
 
    <repositories> 
        <repository> 
            <id>central</id> 
            <url>http://repo1.maven.org/maven2/</url> 
            <snapshots> 
                <enabled>false</enabled> 
            </snapshots> 
            <releases> 
                <enabled>true</enabled> 
            </releases> 
        </repository> 
        <repository> 
            <id>clojars</id> 
            <url>https://clojars.org/repo/</url> 
            <snapshots> 
                <enabled>true</enabled> 
            </snapshots> 
            <releases> 
                <enabled>true</enabled> 
            </releases> 
        </repository> 
        <repository> 
            <id>scala-tools</id> 
            <url>http://scala-tools.org/repo-releases</url> 
            <snapshots> 
                <enabled>true</enabled> 
            </snapshots> 
            <releases> 
                <enabled>true</enabled> 
            </releases> 
        </repository> 
        <repository> 
            <id>conjars</id> 
            <url>http://conjars.org/repo/</url> 
            <snapshots> 
                <enabled>true</enabled> 
            </snapshots> 
            <releases> 
                <enabled>true</enabled> 
            </releases> 
        </repository> 
    </repositories> 
 
    <build> 
        <plugins> 
            <plugin> 
                <groupId>org.apache.maven.plugins</groupId> 
                <artifactId>maven-compiler-plugin</artifactId> 
                <version>3.1</version> 
                <configuration> 
                    <source>1.6</source> 
                    <target>1.6</target> 
                    <encoding>UTF-8</encoding> 
                    <showDeprecation>true</showDeprecation> 
                    <showWarnings>true</showWarnings> 
                </configuration> 
            </plugin> 
            <plugin> 
                <artifactId>maven-assembly-plugin</artifactId> 
                <configuration> 
                    <descriptorRefs> 
                        <descriptorRef>jar-with-dependencies</descriptorRef> 
                    </descriptorRefs> 
                    <archive> 
                        <manifest> 
                            <mainClass></mainClass> 
                        </manifest> 
                    </archive> 
                </configuration> 
                <executions> 
                    <execution> 
                        <id>make-assembly</id> 
                        <phase>package</phase> 
                        <goals> 
                            <goal>single</goal> 
                        </goals> 
                    </execution> 
                </executions> 
            </plugin> 
        </plugins> 
    </build> 
</project>

 

 

(2)kafka发来的数据通过levelSplitbolt进行分割处理,然后再发送到下一个Bolt中。代码如下:

 

package com.kafka.spout; 
 
import java.util.regex.Matcher; 
import java.util.regex.Pattern; 
import backtype.storm.topology.BasicOutputCollector; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.base.BaseBasicBolt; 
import backtype.storm.tuple.Fields; 
import backtype.storm.tuple.Tuple; 
import backtype.storm.tuple.Values; 
 
 
public class LevelSplit extends BaseBasicBolt { 
 
 
    public void execute(Tuple tuple, BasicOutputCollector collector) { 
        String words = tuple.getString(0).toString();//the cow jumped over the moon 
        String []va=words.split(" "); 
        for(String word : va) 
        { 
        	collector.emit(new Values(word)); 
        } 
		 
    } 
 
   
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
        declarer.declare(new Fields("word")); 
    } 
 
} 

  

(3)将levelSplit的Bolt发来的数据到levelCount的Bolt中进行计数处理,然后发送到hbase(Bolt)中。代码如下:

 

package com.kafka.spout; 
 
import java.util.HashMap; 
import java.util.Map; 
import java.util.Map.Entry; 
 
import backtype.storm.topology.BasicOutputCollector; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.base.BaseBasicBolt; 
import backtype.storm.tuple.Fields; 
import backtype.storm.tuple.Tuple; 
import backtype.storm.tuple.Values; 
 
 
public class LevelCount extends BaseBasicBolt { 
    Map<String, Integer> counts = new HashMap<String, Integer>(); 
 
	public void execute(Tuple tuple, BasicOutputCollector collector) { 
		// TODO Auto-generated method stub 
		String word = tuple.getString(0); 
        Integer count = counts.get(word); 
        if (count == null) 
            count = 0; 
        count++; 
        counts.put(word, count); 
 
		for (Entry<String, Integer> e : counts.entrySet()) { 
			//sum += e.getValue(); 
			System.out.println(e.getKey() 
								+ "----------->" +e.getValue()); 
        } 
        collector.emit(new Values(word, count));       
	} 
 
	public void declareOutputFields(OutputFieldsDeclarer declarer) { 
		// TODO Auto-generated method stub 
		 declarer.declare(new Fields("word", "count")); 
	} 
} 

  

(4)准备连接kafkahbase条件以及设置整个拓扑结构并且提交拓扑。代码如下:

 

package com.kafka.spout; 
 
 
import java.util.HashMap; 
import java.util.Map; 
 
import com.google.common.collect.Maps; 
 
//import org.apache.storm.guava.collect.Maps; 
 
 
import backtype.storm.Config; 
import backtype.storm.LocalCluster; 
import backtype.storm.StormSubmitter; 
import backtype.storm.generated.AlreadyAliveException; 
import backtype.storm.generated.InvalidTopologyException; 
import backtype.storm.spout.SchemeAsMultiScheme; 
import backtype.storm.topology.TopologyBuilder; 
import backtype.storm.tuple.Fields; 
import backtype.storm.utils.Utils; 
import storm.kafka.BrokerHosts; 
import storm.kafka.KafkaSpout; 
import storm.kafka.SpoutConfig; 
import storm.kafka.ZkHosts; 
 
 
 
public class StormKafkaTopo { 
    public static void main(String[] args) { 
    	 
    	 
        BrokerHosts brokerHosts = new ZkHosts("zeb,yjd,ylh"); 
        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "yjd", "/storm", "kafkaspout"); 
        Config conf = new Config();    
        spoutConfig.scheme =  new SchemeAsMultiScheme(new MessageScheme());     
         
        SimpleHBaseMapper mapper = new SimpleHBaseMapper(); 
        mapper.withColumnFamily("result"); 
        mapper.withColumnFields(new Fields("count")); 
        mapper.withRowKeyField("word"); 
         
        Map<String, Object> map = Maps.newTreeMap(); 
        map.put("hbase.rootdir", "hdfs://zeb:9000/hbase"); 
        map.put("hbase.zookeeper.quorum", "zeb:2181,yjd:2181,ylh:2181"); 
         
        // hbase-bolt 
        HBaseBolt hBaseBolt = new HBaseBolt("wc", mapper).withConfigKey("hbase.conf"); 
 
        conf.setDebug(true); 
        conf.put("hbase.conf", map); 
 
         
        TopologyBuilder builder = new TopologyBuilder(); 
        builder.setSpout("spout", new KafkaSpout(spoutConfig)); 
        builder.setBolt("split", new LevelSplit(), 1).shuffleGrouping("spout"); 
        builder.setBolt("count", new LevelCount(), 1).fieldsGrouping("split", new Fields("word")); 
        builder.setBolt("hbase", hBaseBolt, 1).shuffleGrouping("count"); 
         
        if(args != null && args.length > 0) { 
            //提交到集群运行 
            try { 
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); 
            } catch (AlreadyAliveException e) { 
                e.printStackTrace(); 
            } catch (InvalidTopologyException e) { 
                e.printStackTrace(); 
            } 
        } else { 
            //本地模式运行 
            LocalCluster cluster = new LocalCluster(); 
            cluster.submitTopology("Topotest1121", conf, builder.createTopology()); 
            Utils.sleep(1000000); 
            cluster.killTopology("Topotest1121"); 
            cluster.shutdown(); 
        }            
    } 
} 

  

(5)在kafka端用控制台生产数据,如下:

 

kafka+storm+hbase详解编程语言

 

2、运行结果截图:

 kafka+storm+hbase详解编程语言

 

3、遇到的问题:

(1)把所有的工作做好后,提交了拓扑,运行代码。发生了错误1,如下:

 kafka+storm+hbase详解编程语言

 

解决:原来是因为依赖版本要统一的问题,最后将版本修改一致后,成功解决。

(2)发生了错误2,如下:

 kafka+storm+hbase详解编程语言

 

解决:原来是忘记开hbase中的HMaster和HRegionServer。启动后问题成功解决。

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/11771.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论