Druid的单机版安装参考:https://blog.51cto.com/10120275/2429912
Druid实时接入Kafka的过程
下载、安装、启动kafka过程:
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.1/kafka_2.11-2.2.1.tgz
tar -zxvf kafka_2.11-2.2.1.tgz
ln -s kafka_2.11-2.2.1 kafka
$KAFKA_HOME/kafka-server-start.sh ~/kafka/config/server.properties 1>/dev/null 2>&1 &
创建topic : wikipedia./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wikipedia
解压wikiticker-2015-09-12-sampled.json.gz文件,这个步骤是给kafka topic准备输入文件
cd $DRUID_HOME/quickstart/tutorial
gunzip -k wikiticker-2015-09-12-sampled.json.gz
这个步骤操作完成后,在$DRUID_HOME/quickstart/tutorial文件夹下生成wikiticker-2015-09-12-sampled.json
上图配置文件如下,其中bootstrap.servers配置kafka地址
{
"type": "kafka",
"dataSchema": {
"dataSource": "wikipedia",
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "time",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"channel",
"cityName",
"comment",
"countryIsoCode",
"countryName",
"isAnonymous",
"isMinor",
"isNew",
"isRobot",
"isUnpatrolled",
"metroCode",
"namespace",
"page",
"regionIsoCode",
"regionName",
"user",
{ "name": "added", "type": "long" },
{ "name": "deleted", "type": "long" },
{ "name": "delta", "type": "long" }
]
}
}
},
"metricsSpec" : [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"rollup": false
}
},
"tuningConfig": {
"type": "kafka",
"reportParseExceptions": false
},
"ioConfig": {
"topic": "wikipedia",
"replicas": 2,
"taskDuration": "PT10M",
"completionTimeout": "PT20M",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
}
}
}
接下来要将wikiticker-2015-09-12-sampled.json文件内容,利用kafka生产者脚本写入wikipedia的topic中
export KAFKA_OPTS="-Dfile.encoding=UTF-8"
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikipedia < {PATH_TO_DRUID}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json
原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/194357.html