1. create Idea project for AsyncHbaseEventSerializer
添加dependency 到pom.xml
<dependency>
<groupId>org.apache.flume.flume-ng-sinks</groupId>
<artifactId>flume-ng-hbase-sink</artifactId>
<version>1.6.0</version>
</dependency>
Implements AsyncHbaseEventSerializer according to the business.
import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.conf.ComponentConfiguration; import org.apache.flume.sink.hbase.AsyncHbaseEventSerializer; import org.hbase.async.AtomicIncrementRequest; import org.hbase.async.PutRequest; import java.util.ArrayList; import java.util.List; /** * Created by root on 12/5/17. */ public class SplittingSerializer implements AsyncHbaseEventSerializer { private byte[] table; private byte[] colFam; private Event currentEvent; private byte[][]rentRowKey; private final byte[] eventCountCol = "eventCount".getBytes(); columnNames; private final List<PutRequest> puts = new ArrayList<PutRequest>(); private final List<AtomicIncrementRequest> incs = new ArrayList<AtomicIncrementRequest>(); private byte[] cur public void initialize(byte[] table, byte[] cf) { this.table = table; this.colFam = cf; //Can not get the columns from context in configure method. Had to hard coded here. columnNames = new byte[3][]; columnNames[0] = "name".getBytes(); columnNames[1] = "id".getBytes(); columnNames[2] = "phone".getBytes(); } public void setEvent(Event event) { // Set the event and verify that the rowKey is not present this.currentEvent = event; /* //Don't know how to set the key of event header. String rowKeyStr = currentEvent.getHeaders().get("rowKey"); if (rowKeyStr == null) { throw new FlumeException("No row key found in headers!"); } currentRowKey = rowKeyStr.getBytes();*/ } public List<PutRequest> getActions() { // Split the event body and get the values for the columns String eventStr = new String(currentEvent.getBody()); String[] cols = eventStr.split(","); Long currTime = System.currentTimeMillis(); long revTs = Long.MAX_VALUE – currTime; currentRowKey = (Long.toString(revTs) + cols[0]).getBytes(); puts.clear(); for (int i = 0; i < cols.length; i++) { //Generate a PutRequest for each column. PutRequest req = new PutRequest(table, currentRowKey, colFam, columnNames[i], cols[i].getBytes()); puts.add(req); } return puts; } public List<AtomicIncrementRequest> getIncrements() { incs.clear(); //Increment the number of events received incs.add(new AtomicIncrementRequest(table, "totalEvents".getBytes(), colFam, eventCountCol)); return incs; } public void cleanUp() { table = null; colFam = null; currentEvent = null; columnNames = null; currentRowKey = null; } public void configure(Context context) { //Get the column names from the configuration //Did not work. Don't know how to use it. String cols = new String(context.getString("columns")); String[] names = cols.split(","); byte[][] columnNames = new byte[names.length][]; int i = 0; System.out.println("getting columnNames"); for(String name : names) { columnNames[i++] = name.getBytes(); } } public void configure(ComponentConfiguration componentConfiguration) { } } |
build and deploy the jar file
build –> build artifacts
copy to the lib directory of flume. Here I use scp to upload to the flume of another host.
2. configure flume
a1.sources = r1 a1.channels = c1 c2 a1.sinks = k1 sink2 a1.source.s1.selector.type = replicating #NetCat TCP source a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 6666 a1.sources.r1.channels = c1 c2 #channel a1.channels.c2.type = memory a1.channels.c2.capacity = 10000 a1.channels.c2.transactionCapacity = 1000 #HBase sink a1.sinks.sink2.type = org.apache.flume.sink.hbase.AsyncHBaseSink a1.sinks.sink2.channel = c2 a1.sinks.sink2.table = law a1.sinks.sink2.columnFamily = lawfile a1.sinks.sink2.batchSize = 5000 #The serializer to use a1.sinks.sink2.serializer = ifre.flume.hbase.SplittingSerializer #List of columns each event writes to. a1.sinks.sink2.serializer.columns = name,id,phone |
3. create hbase table
# hbase shell create "law" "lawfile" |
4. run flume agent
[root@ifrebigsearch2 apache-flume-1.6.0-bin]# bin/flume-ng agent –conf conf –conf-file conf/crawler-hdfs-conf.properties –name a1 -Dflume.root.logger=INFO,console |
5. run nc
[root@ifrebigsearch0 dkh]# nc ifrebigsearch2 6666 zhangsan,10110198806054561,13812345678 OK |
6.result
hbase(main):002:0> scan 'law' ROW COLUMN+CELL 9223370524386395508z column=lawfile:id, timestamp=1512468380362, value=10110198 hangsan 806054561 9223370524386395508z column=lawfile:name, timestamp=1512468380361, value=zhangs hangsan an 9223370524386395508z column=lawfile:phone, timestamp=1512468380363, value=13812 hangsan 345678 |
原创文章,作者:3628473679,如若转载,请注明出处:https://blog.ytso.com/tech/opensource/195578.html