Flink1.11执行sql当判空使用<> null,程序直接结束

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink1.11执行sql当判空使用<> null,程序直接结束

datayangl
环境:flink1.11:
代码如下:
val dataStreamEnv: StreamExecutionEnvironment = FlinkUtils.streamEnv
val tableEnv: StreamTableEnvironment = FlinkUtils.streamTableEnv
val sql = """SELECT
      CASE
        WHEN
          kafka_table.log_type = 'detect'
          AND
          kafka_table.event_level = 3
        THEN 3
        ELSE 0
      END as weight,
      kafka_table.src_ip as kafka_table_src_ip_0,
          kafka_table.dev_type as kafka_table_dev_type_0
    FROM
      kafka_table
    WHERE
      kafka_table.event_time >= unix_timestamp() - 60 * 60 * 5
      AND
      kafka_table.src_ip <> null
          AND
          kafka_table.event_level > 0
          AND
          kafka_table.dev_type = 1


val data:Table = tableEnv.sqlQuery(sql)
val result = tableEnv.toRetractStream[Row](data)
result.print("====>")
"""



现象:如果判空条件为kafka_table.src_ip <> null,则程序直接结束,没有任何报错,而使用kafka_table.src_ip is
not null 可以正常运行并一直产生数据。

疑问:我明白is not null是正确的用法,问题是用<> null 为什么程序会直接结束而且没有任何报错,感觉像是当作批处理去运行了。



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink1.11执行sql当判空使用<> null,程序直接结束

Benchao Li-2
Hi datayangl,

这是因为kafka_table.src_ip <>
null是恒等于false的,所以这个计算过程就被优化掉了,最后你的作业的逻辑就变成了一个单纯的values,里面没有一条数据。

关于为什么kafka_table.src_ip <> null,这个可以了解一下关于three-value-logic[1].
简单来说,在标准SQL里面,boolean类型是有三种值的,正常的= <>这种算子跟null比较的时候,结果都是unknown,
然后这个在filter条件里面会被视作false。

[1] https://modern-sql.com/concept/three-valued-logic

datayangl <[hidden email]> 于2021年3月19日周五 下午4:02写道:

> 环境:flink1.11:
> 代码如下:
> val dataStreamEnv: StreamExecutionEnvironment = FlinkUtils.streamEnv
> val tableEnv: StreamTableEnvironment = FlinkUtils.streamTableEnv
> val sql = """SELECT
>       CASE
>         WHEN
>           kafka_table.log_type = 'detect'
>           AND
>           kafka_table.event_level = 3
>         THEN 3
>         ELSE 0
>       END as weight,
>       kafka_table.src_ip as kafka_table_src_ip_0,
>           kafka_table.dev_type as kafka_table_dev_type_0
>     FROM
>       kafka_table
>     WHERE
>       kafka_table.event_time >= unix_timestamp() - 60 * 60 * 5
>       AND
>       kafka_table.src_ip <> null
>           AND
>           kafka_table.event_level > 0
>           AND
>           kafka_table.dev_type = 1
>
>
> val data:Table = tableEnv.sqlQuery(sql)
> val result = tableEnv.toRetractStream[Row](data)
> result.print("====>")
> """
>
>
>
> 现象:如果判空条件为kafka_table.src_ip <> null,则程序直接结束,没有任何报错,而使用kafka_table.src_ip
> is
> not null 可以正常运行并一直产生数据。
>
> 疑问:我明白is not null是正确的用法,问题是用<> null 为什么程序会直接结束而且没有任何报错,感觉像是当作批处理去运行了。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: Flink1.11执行sql当判空使用<> null,程序直接结束

datayangl
calcite解析将<> null 解析为unknown, 在flink优化阶段直接将unkown这个条件默认视为false,通过规则匹配
将整条sql优化为values(没有任何结果的sql),于是直接将程序的source task finish了。这个过程我理解的对吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink1.11执行sql当判空使用<> null,程序直接结束

Benchao Li-2
嗯,是这样的。

datayangl <[hidden email]> 于2021年3月19日周五 下午5:55写道:

> calcite解析将<> null 解析为unknown, 在flink优化阶段直接将unkown这个条件默认视为false,通过规则匹配
> 将整条sql优化为values(没有任何结果的sql),于是直接将程序的source task finish了。这个过程我理解的对吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



--

Best,
Benchao Li