目前在学习使用pyflink的Table api,请教一个pyflink udf函数报错问题:
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING()) def drop_fields(message, *fields): import json message = json.loads(message) for field in fields: message.pop(field) return json.dumps(message) st_env \ .form_path("source") \ .select("drop_fields(message,'x')") \ .insert_into("sink") message 格式: {“a”:"1","x","2"} 报错参数类型不匹配: Actual:(java.lang.String, java.lang.String) Expected:(org.apache.flink.table.dataformat.BinaryString) 新手入门,请多指教,感谢。 |
The input types should be as following:
input_types=[DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING())] Regards, Dian > 在 2020年6月1日,上午10:49,刘亚坤 <[hidden email]> 写道: > > 目前在学习使用pyflink的Table api,请教一个pyflink udf函数报错问题: > > @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING()) > def drop_fields(message, *fields): > import json > message = json.loads(message) > for field in fields: > message.pop(field) > return json.dumps(message) > > > st_env \ > .form_path("source") \ > .select("drop_fields(message,'x')") \ > .insert_into("sink") > > message 格式: > {“a”:"1","x","2"} > > 报错参数类型不匹配: > Actual:(java.lang.String, java.lang.String) > Expected:(org.apache.flink.table.dataformat.BinaryString) > > 新手入门,请多指教,感谢。 |
是的,对应参数没有填写正确,感谢; 另外请教,udf函数是不是目前不支持python的可变参数,我使用可变参数依然会报错参数不对的问题。 在 2020-06-01 11:01:34,"Dian Fu" <[hidden email]> 写道: >The input types should be as following: > >input_types=[DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING())] > >Regards, >Dian > >> 在 2020年6月1日,上午10:49,刘亚坤 <[hidden email]> 写道: >> >> 目前在学习使用pyflink的Table api,请教一个pyflink udf函数报错问题: >> >> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING()) >> def drop_fields(message, *fields): >> import json >> message = json.loads(message) >> for field in fields: >> message.pop(field) >> return json.dumps(message) >> >> >> st_env \ >> .form_path("source") \ >> .select("drop_fields(message,'x')") \ >> .insert_into("sink") >> >> message 格式: >> {“a”:"1","x","2"} >> >> 报错参数类型不匹配: >> Actual:(java.lang.String, java.lang.String) >> Expected:(org.apache.flink.table.dataformat.BinaryString) >> >> 新手入门,请多指教,感谢。 > |
你传的第二个参数是string,这样试一下?
select("drop_fields(message, array('x'))") 不太确定我是否理解了你的问题(如果上面不行的话,建议你发一下exception) > 在 2020年6月1日,下午1:59,jack <[hidden email]> 写道: > > > > > > > > 是的,对应参数没有填写正确,感谢; > 另外请教,udf函数是不是目前不支持python的可变参数,我使用可变参数依然会报错参数不对的问题。 > > > > > > > > > > > > 在 2020-06-01 11:01:34,"Dian Fu" <[hidden email]> 写道: >> The input types should be as following: >> >> input_types=[DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING())] >> >> Regards, >> Dian >> >>> 在 2020年6月1日,上午10:49,刘亚坤 <[hidden email]> 写道: >>> >>> 目前在学习使用pyflink的Table api,请教一个pyflink udf函数报错问题: >>> >>> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING()) >>> def drop_fields(message, *fields): >>> import json >>> message = json.loads(message) >>> for field in fields: >>> message.pop(field) >>> return json.dumps(message) >>> >>> >>> st_env \ >>> .form_path("source") \ >>> .select("drop_fields(message,'x')") \ >>> .insert_into("sink") >>> >>> message 格式: >>> {“a”:"1","x","2"} >>> >>> 报错参数类型不匹配: >>> Actual:(java.lang.String, java.lang.String) >>> Expected:(org.apache.flink.table.dataformat.BinaryString) >>> >>> 新手入门,请多指教,感谢。 >> |
您理解的是对的,我测试了下,好像pyflink的udf函数不太支持python的可变参数 在 2020-06-01 14:47:21,"Dian Fu" <[hidden email]> 写道: >你传的第二个参数是string,这样试一下? >select("drop_fields(message, array('x'))") > >不太确定我是否理解了你的问题(如果上面不行的话,建议你发一下exception) > >> 在 2020年6月1日,下午1:59,jack <[hidden email]> 写道: >> >> >> >> >> >> >> >> 是的,对应参数没有填写正确,感谢; >> 另外请教,udf函数是不是目前不支持python的可变参数,我使用可变参数依然会报错参数不对的问题。 >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-06-01 11:01:34,"Dian Fu" <[hidden email]> 写道: >>> The input types should be as following: >>> >>> input_types=[DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING())] >>> >>> Regards, >>> Dian >>> >>>> 在 2020年6月1日,上午10:49,刘亚坤 <[hidden email]> 写道: >>>> >>>> 目前在学习使用pyflink的Table api,请教一个pyflink udf函数报错问题: >>>> >>>> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING()) >>>> def drop_fields(message, *fields): >>>> import json >>>> message = json.loads(message) >>>> for field in fields: >>>> message.pop(field) >>>> return json.dumps(message) >>>> >>>> >>>> st_env \ >>>> .form_path("source") \ >>>> .select("drop_fields(message,'x')") \ >>>> .insert_into("sink") >>>> >>>> message 格式: >>>> {“a”:"1","x","2"} >>>> >>>> 报错参数类型不匹配: >>>> Actual:(java.lang.String, java.lang.String) >>>> Expected:(org.apache.flink.table.dataformat.BinaryString) >>>> >>>> 新手入门,请多指教,感谢。 >>> |
Free forum by Nabble | Edit this page |