我在使用Flink写入数据到ES中,程序可以执行成功但是ES中没有数据,而且没有任何报错信息我首先创建了一个sink的es表String sql =
"CREATE TABLE es_sink (\n" + "uid INT,\n" + "appid INT,\n" + "prepage_id INT,\n" + "page_id INT,\n" + "action_id STRING,\n" + "page_name STRING,\n" + "action_name STRING,\n" + "prepage_name STRING,\n" + "stat_time BIGINT,\n" + "dt DATE,\n" + "PRIMARY KEY (uid) NOT ENFORCED\n" + ") WITH (\n" + "'connector.type' = 'elasticsearch',\n" + "'connector.version' = '6',\n" + "'connector.hosts' = '<a href="http://localhost:9200'">http://localhost:9200',\n" + "'connector.index' = 'mytest',\n" + "'connector.document-type' = 'user_action',\n" + "'update-mode' = 'append',\n" + "'connector.key-null-literal' = 'n/a',\n" + "'connector.bulk-flush.max-actions' = '1',\n" + "'format.type' = 'json'\n" + ")";并通过下面查询出数据String sql = "select 1 as uid,2 as appid,3 as prepage_id,4 as page_id,'5' as action_id,'6' as page_name,'7' as action_name,'8' as prepage_name,cast(9 as bigint) as stat_time, cast('2020-11-11' as date) as dt from student limit 1";我的flink版本是1.11.1,es版本是6.2.2有遇到的朋友可以帮助我看一下 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
你的SQL语句语法有误,请参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/elasticsearch.html 希望能帮助到你! 发件人: 小墨鱼 发送时间: 2020-12-11 14:46 收件人: user-zh 主题: flink无法写入数据到ES中 我在使用Flink写入数据到ES中,程序可以执行成功但是ES中没有数据,而且没有任何报错信息我首先创建了一个sink的es表String sql = "CREATE TABLE es_sink (\n" + "uid INT,\n" + "appid INT,\n" + "prepage_id INT,\n" + "page_id INT,\n" + "action_id STRING,\n" + "page_name STRING,\n" + "action_name STRING,\n" + "prepage_name STRING,\n" + "stat_time BIGINT,\n" + "dt DATE,\n" + "PRIMARY KEY (uid) NOT ENFORCED\n" + ") WITH (\n" + "'connector.type' = 'elasticsearch',\n" + "'connector.version' = '6',\n" + "'connector.hosts' = '<a href="http://localhost:9200'">http://localhost:9200',\n" + "'connector.index' = 'mytest',\n" + "'connector.document-type' = 'user_action',\n" + "'update-mode' = 'append',\n" + "'connector.key-null-literal' = 'n/a',\n" + "'connector.bulk-flush.max-actions' = '1',\n" + "'format.type' = 'json'\n" + ")";并通过下面查询出数据String sql = "select 1 as uid,2 as appid,3 as prepage_id,4 as page_id,'5' as action_id,'6' as page_name,'7' as action_name,'8' as prepage_name,cast(9 as bigint) as stat_time, cast('2020-11-11' as date) as dt from student limit 1";我的flink版本是1.11.1,es版本是6.2.2有遇到的朋友可以帮助我看一下 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |