本篇文章给大家分享的是有关Connectors怎样连接ElasticSearch,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
通过使用Flink DataStream Connectors 数据流连接器连接到ElasticSearch搜索引擎的文档数据库Index,并提供数据流输入与输出操作;
示例环境
java.version: 1.8.xflink.version: 1.11.1elasticsearch:6.x
示例数据源 (项目码云下载)
Flink 系例 之 搭建开发环境与数据
示例模块 (pom.xml)
Flink 系例 之 DataStream Connectors 与 示例模块
数据流输入
DataStreamSource.java
package com.flink.examples.elasticsearch; import com.flink.examples.TUser; import com.google.gson.Gson; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.http.Header; import org.apache.http.HttpHost; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; import java.io.IOException; import java.util.Map; /** * @Description 从elasticsearch中获取数据并输出到DataStream数据流中 */ public class DataStreamSource { /** * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/elasticsearch.html */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<TUser> dataStream = env.addSource(new RichSourceFunction<TUser>(){ private RestClientBuilder builder = null; //job开始执行,调用此方法创建数据源连接对象,该方法主要用于打开连接 @Override public void open(Configuration parameters) throws Exception { super.open(parameters); builder = RestClient.builder(new HttpHost("192.168.110.35", 9200, "http")); } //执行查询并对数据进行封装 @Override public void run(SourceContext<TUser> ctx) throws Exception { Gson gson = new Gson(); RestHighLevelClient client = null; //匹配查询 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(QueryBuilders.matchQuery("sex", 1)); //定义索引库 SearchRequest request = new SearchRequest(); request.types("doc"); request.indices("flink_demo"); request.source(sourceBuilder); try { client = new RestHighLevelClient(builder); SearchResponse response = client.search(request, new Header[]{}); SearchHits hits = response.getHits(); System.out.println("查询结果有" + hits.getTotalHits() + "条"); for (SearchHit searchHits : hits ) { Map<String,Object> dataMap = searchHits.getSourceAsMap(); TUser user = gson.fromJson(gson.toJson(dataMap), TUser.class); ctx.collect(user); } //ID查询 // GetRequest request = new GetRequest( "flink_demo","doc","NeMaoXQBElQ9wTD5MOfB"); // client = new RestHighLevelClient(builder); // GetResponse getResponse = client.get(request, new Header[]{}); // Map<String,Object> dataMap = getResponse.getSourceAsMap(); // TUser user = gson.fromJson(gson.toJson(dataMap), TUser.class); // ctx.collect(user); }catch(IOException ioe){ ioe.printStackTrace(); }finally { if (client != null){ client.close(); } } } //Job结束时调用 @Override public void cancel() { try { super.close(); } catch (Exception e) { } builder = null; } }); dataStream.print(); env.execute("flink es to data job"); } }
数据流输出
DataStreamSink.java
package com.flink.examples.elasticsearch; import com.flink.examples.TUser; import com.google.gson.Gson; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; import org.apache.http.HttpHost; import org.elasticsearch.client.Requests; import java.util.ArrayList; import java.util.List; import java.util.Map; /** * @Description 将DataStream数据流输出到elasticsearch中 */ public class DataStreamSink { /** * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/elasticsearch.html */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); env.setParallelism(2); //1.设置Elasticsearch连接,创建索引数据 List<HttpHost> httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("192.168.110.35", 9200, "http")); //创建数据源对象 ElasticsearchSink ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<String>(httpHosts, new ElasticsearchSinkFunction<String>() { @Override public void process(String user, RuntimeContext ctx, RequestIndexer indexer) { Gson gson = new Gson(); Map<String,Object> map = gson.fromJson(user, Map.class); indexer.add(Requests.indexRequest() .index("flink_demo") .type("doc") .source(map)); } } ); // 设置批量写数据的最大动作量,对批量请求的配置;这指示接收器在每个元素之后发出,否则它们将被缓冲 esSinkBuilder.setBulkFlushMaxActions(10); //刷新前缓冲区的最大数据大小(以MB为单位) esSinkBuilder.setBulkFlushMaxSizeMb(500); //论缓冲操作的数量或大小如何都要刷新的时间间隔 esSinkBuilder.setBulkFlushInterval(4000); //2.写入数据到流中 //封装数据 TUser user = new TUser(); user.setId(9); user.setName("wang1"); user.setAge(23); user.setSex(1); user.setAddress("CN"); user.setCreateTimeSeries(System.currentTimeMillis()); DataStream<String> input = env.fromElements(user).map((MapFunction<TUser, String>) value -> new Gson().toJson(value)); //3.将数据写入到Elasticearch中 input.addSink(esSinkBuilder.build()); env.execute("flink data to es job"); } }
数据展示
以上就是Connectors怎样连接ElasticSearch,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注亿速云行业资讯频道。
原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/tech/opensource/223164.html