【Druid】Druid读取Kafka数据的简单配置过程

Druid的单机版安装参考:https://blog.51cto.com/10120275/2429912

Druid实时接入Kafka的过程

下载、安装、启动kafka过程:

wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.1/kafka_2.11-2.2.1.tgz
tar -zxvf kafka_2.11-2.2.1.tgz
ln -s kafka_2.11-2.2.1 kafka
$KAFKA_HOME/kafka-server-start.sh ~/kafka/config/server.properties 1>/dev/null 2>&1 &

创建topic : wikipedia
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wikipedia

解压wikiticker-2015-09-12-sampled.json.gz文件,这个步骤是给kafka topic准备输入文件

cd $DRUID_HOME/quickstart/tutorial
gunzip -k wikiticker-2015-09-12-sampled.json.gz

这个步骤操作完成后,在$DRUID_HOME/quickstart/tutorial文件夹下生成wikiticker-2015-09-12-sampled.json

【Druid】Druid读取Kafka数据的简单配置过程

上图配置文件如下,其中bootstrap.servers配置kafka地址

{
  "type": "kafka",
  "dataSchema": {
    "dataSource": "wikipedia",
    "parser": {
      "type": "string",
      "parseSpec": {
        "format": "json",
        "timestampSpec": {
          "column": "time",
          "format": "auto"
        },
        "dimensionsSpec": {
          "dimensions": [
            "channel",
            "cityName",
            "comment",
            "countryIsoCode",
            "countryName",
            "isAnonymous",
            "isMinor",
            "isNew",
            "isRobot",
            "isUnpatrolled",
            "metroCode",
            "namespace",
            "page",
            "regionIsoCode",
            "regionName",
            "user",
            { "name": "added", "type": "long" },
            { "name": "deleted", "type": "long" },
            { "name": "delta", "type": "long" }
          ]
        }
      }
    },
    "metricsSpec" : [],
    "granularitySpec": {
      "type": "uniform",
      "segmentGranularity": "DAY",
      "queryGranularity": "NONE",
      "rollup": false
    }
  },
  "tuningConfig": {
    "type": "kafka",
    "reportParseExceptions": false
  },
  "ioConfig": {
    "topic": "wikipedia",
    "replicas": 2,
    "taskDuration": "PT10M",
    "completionTimeout": "PT20M",
    "consumerProperties": {
      "bootstrap.servers": "localhost:9092" 
    }
  }
}

接下来要将wikiticker-2015-09-12-sampled.json文件内容,利用kafka生产者脚本写入wikipedia的topic中

export KAFKA_OPTS="-Dfile.encoding=UTF-8"
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikipedia < {PATH_TO_DRUID}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/194357.html

(0)
上一篇 2021年11月16日
下一篇 2021年11月16日

相关推荐

发表回复

登录后才能评论