toAppendStream  类型不匹配问题

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

toAppendStream  类型不匹配问题

admin
参考jark老师博客里的demo,写了个table api/sql的程序,在table转appendStream时报错
flink版本1.10
代码如下:
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //以后版本会将old planner移除
    EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
    tableEnv.sqlUpdate("CREATE TABLE user_behavior (\n" +
            "    user_id BIGINT,\n" +
            "    item_id BIGINT,\n" +
            "    category_id BIGINT,\n" +
            "    behavior STRING,\n" +
            "    ts TIMESTAMP(3),\n" +
            "    proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列\n" +
            "    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 在ts上定义watermark,ts成为事件时间列\n" +
            ") WITH (\n" +
            "    'connector.type' = 'kafka',  -- 使用 kafka connector\n" +
            "    'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本\n" +
            "    'connector.topic' = 'user_behavior',  -- kafka topic\n" +
            "    'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取\n" +
            "    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper 地址\n" +
            "    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker 地址\n" +
            "    'format.type' = 'json'  -- 数据源格式为 json\n" +
            ")");
    Table table1 = tableEnv.sqlQuery("select user_id,item_id,category_id,behavior,ts," +
            "proctime from user_behavior where behavior='buy'");
    tableEnv.toAppendStream(table1, Behavior.class).print();
    env.execute();

}

public class Behavior {
    public Long user_id;
    public Long item_id;
    public Long category_id;
    public String behavior;
    public Timestamp ts;
    public Timestamp proctime;


    @Override
    public String toString() {
        return "Behavior{" +
                "user_id=" + user_id +
                ", item_id=" + item_id +
                ", category_id=" + category_id +
                ", behavior='" + behavior + '\'' +
                ", ts=" + ts +
                ", proctime=" + proctime +
                '}';
    }
}
报错如下:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink  do not match.
Query schema: [user_id: BIGINT, item_id: BIGINT, category_id: BIGINT, behavior: STRING, ts: TIMESTAMP(3) *ROWTIME*, proctime: TIMESTAMP(3) NOT NULL *PROCTIME*]
Sink schema: [behavior: STRING, category_id: BIGINT, item_id: BIGINT, proctime: TIMESTAMP(3), ts: TIMESTAMP(3), user_id: BIGINT]
        at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96)
        at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:229)
        at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
        at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
        at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
        at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
        at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
        at sql.KafkaSourceTable.main(KafkaSourceTable.java:35)

pojo的类型定义是和source table字段类型是一致的,
为什么还会校验 NOT NULL *PROCTIME* ,*ROWTIME*?
Reply | Threaded
Open this post in threaded view
|

Re: toAppendStream 类型不匹配问题

Jark
Administrator
看起来是一个已经修复的 bug (FLINK-16108)。
你可以用正在 RC 的 release-1.10.1 再试下吗?
https://dist.apache.org/repos/dist/dev/flink/flink-1.10.1-rc2/

Best,
Jark

On Mon, 4 May 2020 at 01:01, 祝尚 <[hidden email]> wrote:

> 参考jark老师博客里的demo,写了个table api/sql的程序,在table转appendStream时报错
> flink版本1.10
> 代码如下:
> public static void main(String[] args) throws Exception {
>     StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>     //以后版本会将old planner移除
>     EnvironmentSettings settings =
> EnvironmentSettings.newInstance().useBlinkPlanner().build();
>     StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
> settings);
>     tableEnv.sqlUpdate("CREATE TABLE user_behavior (\n" +
>             "    user_id BIGINT,\n" +
>             "    item_id BIGINT,\n" +
>             "    category_id BIGINT,\n" +
>             "    behavior STRING,\n" +
>             "    ts TIMESTAMP(3),\n" +
>             "    proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列\n" +
>             "    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  --
> 在ts上定义watermark,ts成为事件时间列\n" +
>             ") WITH (\n" +
>             "    'connector.type' = 'kafka',  -- 使用 kafka connector\n" +
>             "    'connector.version' = 'universal',  -- kafka 版本,universal
> 支持 0.11 以上的版本\n" +
>             "    'connector.topic' = 'user_behavior',  -- kafka topic\n" +
>             "    'connector.startup-mode' = 'earliest-offset',  -- 从起始
> offset 开始读取\n" +
>             "    'connector.properties.zookeeper.connect' =
> 'localhost:2181',  -- zookeeper 地址\n" +
>             "    'connector.properties.bootstrap.servers' =
> 'localhost:9092',  -- kafka broker 地址\n" +
>             "    'format.type' = 'json'  -- 数据源格式为 json\n" +
>             ")");
>     Table table1 = tableEnv.sqlQuery("select
> user_id,item_id,category_id,behavior,ts," +
>             "proctime from user_behavior where behavior='buy'");
>     tableEnv.toAppendStream(table1, Behavior.class).print();
>     env.execute();
>
> }
>
> public class Behavior {
>     public Long user_id;
>     public Long item_id;
>     public Long category_id;
>     public String behavior;
>     public Timestamp ts;
>     public Timestamp proctime;
>
>
>     @Override
>     public String toString() {
>         return "Behavior{" +
>                 "user_id=" + user_id +
>                 ", item_id=" + item_id +
>                 ", category_id=" + category_id +
>                 ", behavior='" + behavior + '\'' +
>                 ", ts=" + ts +
>                 ", proctime=" + proctime +
>                 '}';
>     }
> }
> 报错如下:
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Field types of query result and registered TableSink  do not match.
> Query schema: [user_id: BIGINT, item_id: BIGINT, category_id: BIGINT,
> behavior: STRING, ts: TIMESTAMP(3) *ROWTIME*, proctime: TIMESTAMP(3) NOT
> NULL *PROCTIME*]
> Sink schema: [behavior: STRING, category_id: BIGINT, item_id: BIGINT,
> proctime: TIMESTAMP(3), ts: TIMESTAMP(3), user_id: BIGINT]
>         at
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96)
>         at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:229)
>         at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>         at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>         at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>         at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
>         at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
>         at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
>         at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
>         at sql.KafkaSourceTable.main(KafkaSourceTable.java:35)
>
> pojo的类型定义是和source table字段类型是一致的,
> 为什么还会校验 NOT NULL *PROCTIME* ,*ROWTIME*?
Reply | Threaded
Open this post in threaded view
|

回复:toAppendStream 类型不匹配问题

admin
好的,我试试,感谢




| |
Sun.Zhu
|
|
邮箱:[hidden email]
|

Signature is customized by Netease Mail Master

在2020年05月04日 11:22,Jark Wu 写道:
看起来是一个已经修复的 bug (FLINK-16108)。
你可以用正在 RC 的 release-1.10.1 再试下吗?
https://dist.apache.org/repos/dist/dev/flink/flink-1.10.1-rc2/

Best,
Jark

On Mon, 4 May 2020 at 01:01, 祝尚 <[hidden email]> wrote:

> 参考jark老师博客里的demo,写了个table api/sql的程序,在table转appendStream时报错
> flink版本1.10
> 代码如下:
> public static void main(String[] args) throws Exception {
>     StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>     //以后版本会将old planner移除
>     EnvironmentSettings settings =
> EnvironmentSettings.newInstance().useBlinkPlanner().build();
>     StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
> settings);
>     tableEnv.sqlUpdate("CREATE TABLE user_behavior (\n" +
>             "    user_id BIGINT,\n" +
>             "    item_id BIGINT,\n" +
>             "    category_id BIGINT,\n" +
>             "    behavior STRING,\n" +
>             "    ts TIMESTAMP(3),\n" +
>             "    proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列\n" +
>             "    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  --
> 在ts上定义watermark,ts成为事件时间列\n" +
>             ") WITH (\n" +
>             "    'connector.type' = 'kafka',  -- 使用 kafka connector\n" +
>             "    'connector.version' = 'universal',  -- kafka 版本,universal
> 支持 0.11 以上的版本\n" +
>             "    'connector.topic' = 'user_behavior',  -- kafka topic\n" +
>             "    'connector.startup-mode' = 'earliest-offset',  -- 从起始
> offset 开始读取\n" +
>             "    'connector.properties.zookeeper.connect' =
> 'localhost:2181',  -- zookeeper 地址\n" +
>             "    'connector.properties.bootstrap.servers' =
> 'localhost:9092',  -- kafka broker 地址\n" +
>             "    'format.type' = 'json'  -- 数据源格式为 json\n" +
>             ")");
>     Table table1 = tableEnv.sqlQuery("select
> user_id,item_id,category_id,behavior,ts," +
>             "proctime from user_behavior where behavior='buy'");
>     tableEnv.toAppendStream(table1, Behavior.class).print();
>     env.execute();
>
> }
>
> public class Behavior {
>     public Long user_id;
>     public Long item_id;
>     public Long category_id;
>     public String behavior;
>     public Timestamp ts;
>     public Timestamp proctime;
>
>
>     @Override
>     public String toString() {
>         return "Behavior{" +
>                 "user_id=" + user_id +
>                 ", item_id=" + item_id +
>                 ", category_id=" + category_id +
>                 ", behavior='" + behavior + '\'' +
>                 ", ts=" + ts +
>                 ", proctime=" + proctime +
>                 '}';
>     }
> }
> 报错如下:
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Field types of query result and registered TableSink  do not match.
> Query schema: [user_id: BIGINT, item_id: BIGINT, category_id: BIGINT,
> behavior: STRING, ts: TIMESTAMP(3) *ROWTIME*, proctime: TIMESTAMP(3) NOT
> NULL *PROCTIME*]
> Sink schema: [behavior: STRING, category_id: BIGINT, item_id: BIGINT,
> proctime: TIMESTAMP(3), ts: TIMESTAMP(3), user_id: BIGINT]
>         at
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96)
>         at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:229)
>         at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>         at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>         at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>         at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
>         at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
>         at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
>         at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
>         at sql.KafkaSourceTable.main(KafkaSourceTable.java:35)
>
> pojo的类型定义是和source table字段类型是一致的,
> 为什么还会校验 NOT NULL *PROCTIME* ,*ROWTIME*?