flink无法写入数据到ES中

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

flink无法写入数据到ES中

小墨鱼
我在使用Flink写入数据到ES中,程序可以执行成功但是ES中没有数据,而且没有任何报错信息我首先创建了一个sink的es表String sql =
"CREATE TABLE es_sink (\n" +                "uid INT,\n" +              
"appid INT,\n" +                "prepage_id INT,\n" +              
"page_id INT,\n" +                "action_id STRING,\n" +              
"page_name STRING,\n" +                "action_name STRING,\n" +              
"prepage_name STRING,\n" +                "stat_time BIGINT,\n" +              
"dt DATE,\n" +                "PRIMARY KEY (uid) NOT ENFORCED\n" +              
") WITH (\n" +                "'connector.type' = 'elasticsearch',\n" +              
"'connector.version' = '6',\n" +                "'connector.hosts' =
'<a href="http://localhost:9200'">http://localhost:9200',\n" +                "'connector.index' =
'mytest',\n" +                "'connector.document-type' = 'user_action',\n"
+                "'update-mode' = 'append',\n" +              
"'connector.key-null-literal' = 'n/a',\n" +              
"'connector.bulk-flush.max-actions' = '1',\n" +              
"'format.type' = 'json'\n" +                ")";并通过下面查询出数据String sql =
"select 1 as uid,2 as appid,3 as prepage_id,4 as page_id,'5' as
action_id,'6' as page_name,'7' as action_name,'8' as prepage_name,cast(9 as
bigint) as stat_time, cast('2020-11-11' as date) as dt from student limit
1";我的flink版本是1.11.1,es版本是6.2.2有遇到的朋友可以帮助我看一下



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink无法写入数据到ES中

Evan
你的SQL语句语法有误,请参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/elasticsearch.html 

希望能帮助到你!

 
发件人: 小墨鱼
发送时间: 2020-12-11 14:46
收件人: user-zh
主题: flink无法写入数据到ES中
我在使用Flink写入数据到ES中,程序可以执行成功但是ES中没有数据,而且没有任何报错信息我首先创建了一个sink的es表String sql =
"CREATE TABLE es_sink (\n" +                "uid INT,\n" +              
"appid INT,\n" +                "prepage_id INT,\n" +              
"page_id INT,\n" +                "action_id STRING,\n" +              
"page_name STRING,\n" +                "action_name STRING,\n" +              
"prepage_name STRING,\n" +                "stat_time BIGINT,\n" +              
"dt DATE,\n" +                "PRIMARY KEY (uid) NOT ENFORCED\n" +              
") WITH (\n" +                "'connector.type' = 'elasticsearch',\n" +              
"'connector.version' = '6',\n" +                "'connector.hosts' =
'<a href="http://localhost:9200'">http://localhost:9200',\n" +                "'connector.index' =
'mytest',\n" +                "'connector.document-type' = 'user_action',\n"
+                "'update-mode' = 'append',\n" +              
"'connector.key-null-literal' = 'n/a',\n" +              
"'connector.bulk-flush.max-actions' = '1',\n" +              
"'format.type' = 'json'\n" +                ")";并通过下面查询出数据String sql =
"select 1 as uid,2 as appid,3 as prepage_id,4 as page_id,'5' as
action_id,'6' as page_name,'7' as action_name,'8' as prepage_name,cast(9 as
bigint) as stat_time, cast('2020-11-11' as date) as dt from student limit
1";我的flink版本是1.11.1,es版本是6.2.2有遇到的朋友可以帮助我看一下
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/