回复:flink-1.11 ddl kafka-to-hive问题

classic Classic list List threaded Threaded
1 message Options
kcz
Reply | Threaded
Open this post in threaded view
|

回复:flink-1.11 ddl kafka-to-hive问题

kcz
谢谢大佬们,公众号有demo了,我去对比一下看看





------------------ 原始邮件 ------------------
发件人: Jingsong Li <[hidden email]&gt;
发送时间: 2020年7月22日 09:34
收件人: user-zh <[hidden email]&gt;
主题: 回复:flink-1.11 ddl kafka-to-hive问题



你的Source表是怎么定义的?确定有watermark前进吗?(可以看Flink UI)

'sink.partition-commit.trigger'='partition-time' 去掉试试?

Best,
Jingsong

On Wed, Jul 22, 2020 at 12:02 AM Leonard Xu <[hidden email]&gt; wrote:

&gt; HI,
&gt;
&gt; Hive 表时在flink里建的吗? 如果是建表时使用了hive dialect吗?可以参考[1]设置下
&gt;
&gt; Best
&gt; Leonard Xu
&gt; [1]
&gt; https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_dialect.html#use-hive-dialect
&gt; <
&gt; https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_dialect.html#use-hive-dialect
&gt; &gt;
&gt;
&gt; &gt; 在 2020年7月21日,22:57,kcz <[hidden email]&gt; 写道:
&gt; &gt;
&gt; &gt; 一直都木有数据 我也不知道哪里不太对 hive有这个表了已经。我测试写ddl hdfs 是OK的
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt; ------------------ 原始邮件 ------------------
&gt; &gt; 发件人: JasonLee <[hidden email] <mailto:[hidden email]&gt;&amp;gt;
&gt; &gt; 发送时间: 2020年7月21日 20:39
&gt; &gt; 收件人: user-zh <[hidden email] <mailto:[hidden email]
&gt; &gt;&amp;gt;
&gt; &gt; 主题: 回复:flink-1.11 ddl kafka-to-hive问题
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt; hi
&gt; &gt; hive表是一直没有数据还是过一段时间就有数据了?
&gt; &gt;
&gt; &gt;
&gt; &gt; | |
&gt; &gt; JasonLee
&gt; &gt; |
&gt; &gt; |
&gt; &gt; 邮箱:[hidden email]
&gt; &gt; |
&gt; &gt;
&gt; &gt; Signature is customized by Netease Mail Master
&gt; &gt;
&gt; &gt; 在2020年07月21日 19:09,kcz 写道:
&gt; &gt; hive-1.2.1
&gt; &gt; chk 已经成功了(去chk目录查看了的确有chk数据,kafka也有数据),但是hive表没有数据,我是哪里缺少了什么吗?
&gt; &gt; String hiveSql = "CREATE&amp;nbsp; TABLE&amp;nbsp; stream_tmp.fs_table (\n" +
&gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "&amp;nbsp; host STRING,\n" +
&gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "&amp;nbsp; url STRING," +
&gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "&amp;nbsp; public_date STRING" +
&gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; ") partitioned by (public_date
&gt; string) " +
&gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "stored as PARQUET " +
&gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "TBLPROPERTIES (\n" +
&gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "&amp;nbsp;
&gt; 'sink.partition-commit.delay'='0 s',\n" +
&gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "&amp;nbsp;
&gt; 'sink.partition-commit.trigger'='partition-time',\n" +
&gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "&amp;nbsp;
&gt; 'sink.partition-commit.policy.kind'='metastore,success-file'" +
&gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; ")";
&gt; &gt; tableEnv.executeSql(hiveSql);
&gt; &gt;
&gt; &gt;
&gt; &gt; tableEnv.executeSql("INSERT INTO&amp;nbsp; stream_tmp.fs_table SELECT host,
&gt; url, DATE_FORMAT(public_date, 'yyyy-MM-dd') FROM stream_tmp.source_table");
&gt;
&gt;

--
Best, Jingsong Lee