最简单的sink to ElasticSearch场景,程序运行没有报错,但是ES里就是没写进去数据。
源代码如下: package etl.estest; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.http.HttpHost; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Request; import org.elasticsearch.client.Requests; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import java.util.*; public class EsTest1 { public static void main(String[] args) throws Exception { test2(); } private static void test2() throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); Properties properties = new Properties(); properties.put("bootstrap.servers","10.67.18.100:9092"); properties.put("zookeeper.connect","10.67.18.100:2180"); properties.put("group.id","test-consumer-group"); FlinkKafkaConsumer<String> pas = new FlinkKafkaConsumer<String>("nms.pas", new SimpleStringSchema(), properties); DataStreamSource<String> pas_stream = env.addSource(pas); pas_stream.print(); List<HttpHost> httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("10.67.18.100", 9310, "http")); ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>( httpHosts, new ElasticsearchSinkFunction<String>() { public IndexRequest createIndexRequest(String element) { Map<String, String> json = new HashMap<>(); json.put("data", element); return Requests.indexRequest() .index("pas-meeting-data-2020.1.17") .type("_doc") .source(json); } @Override public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { indexer.add(createIndexRequest(element)); } } ); pas_stream.addSink(esSinkBuilder.build()); env.execute("sss"); } } |
Free forum by Nabble | Edit this page |