求教flink自定义python udf时TIMESTAMP类型问题

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

求教flink自定义python udf时TIMESTAMP类型问题

元灵
大佬们:
       有没有遇到过使用python udf的时候 DataTypes.TIMESTAMP()类型不匹配的问题啊
       自定义代码是这个:
       @udf(input_types=[DataTypes.INT(), DataTypes.INT(), DataTypes.TIMESTAMP()], result_type=DataTypes.INT())
       def add_new(i, j, k):
            return i + j    #k没用我就是测试一下
       使用的时候:
       st_env.from_path("source") \
          .select("a, add_new(b, c, rowtime) as sumbc") \
          .insert_into("sink")
       报这个错误:
       py4j.protocol.Py4JJavaError: An error occurred while calling o80.select.
      : org.apache.flink.table.api.ValidationException: Given parameters do not match any signature.
       Actual: (java.lang.Integer, java.lang.Integer, java.time.LocalDateTime)
       Expected: (int, int, long)
       或者是不是哪里写错了,求指导:)