各位大佬好,初学pyflink,有一个问题需要帮忙解决下。
代码为: from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings,TableConfig,BatchTableEnvironment from pyflink.table.descriptors import Schema, OldCsv, FileSystem from pyflink.table.udf import udf from pyflink.datastream import StreamExecutionEnvironment elements = 'aaa|bbb' env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env, environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()) @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.ARRAY(DataTypes.STRING())) def split(x): return x.strip().split("|") # t_env.register_function("split", udf(lambda i: i.strip().split("|"), [DataTypes.STRING()], DataTypes.ARRAY(DataTypes.STRING()))) t_env.register_function("split", split) #split拆分后为一个2元数组 @udf(input_types=[DataTypes.ARRAY(DataTypes.STRING()), DataTypes.INT()], result_type=DataTypes.STRING()) def get(array, index): return array[index] @udf(input_types=[DataTypes.ARRAY(DataTypes.STRING())], result_type=DataTypes.STRING()) def convert(array): return get(array, 0) t_env.register_function("convert", convert) t_env.connect(FileSystem().path('/tmp/output')) \ .with_format(OldCsv() .field('b', DataTypes.STRING())) \ .with_schema(Schema() .field('b', DataTypes.STRING())) \ .create_temporary_table('mySink') t_env.from_elements(elements)\ .alias('line')\ .select('split(line)')\ .alias('array')\ .select('convert(array) as b')\ .insert_into('mySink')\ .t_env.execute("convert_job") 报错为: TypeError: 'UserDefinedFunctionWrapper' object is not callable |
Hi, 小学生。
把函数get的标签udf给去掉,它只是普通的Python函数,不要加上@udf,加上之后就不是python的函数了。只有Python的UDF你才要加上@udf Best, Xingbo 小学生 <[hidden email]> 于2020年6月4日周四 下午2:46写道: > 各位大佬好,初学pyflink,有一个问题需要帮忙解决下。 > > > 代码为: > from pyflink.table import StreamTableEnvironment, DataTypes, > EnvironmentSettings,TableConfig,BatchTableEnvironment > from pyflink.table.descriptors import Schema, OldCsv, FileSystem > from pyflink.table.udf import udf > from pyflink.datastream import StreamExecutionEnvironment > elements = 'aaa|bbb' > env = StreamExecutionEnvironment.get_execution_environment() > env.set_parallelism(1) > t_env = StreamTableEnvironment.create(env, > environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()) > > > @udf(input_types=[DataTypes.STRING()], > result_type=DataTypes.ARRAY(DataTypes.STRING())) > def split(x): > return x.strip().split("|") > # t_env.register_function("split", udf(lambda i: i.strip().split("|"), > [DataTypes.STRING()], DataTypes.ARRAY(DataTypes.STRING()))) > t_env.register_function("split", split) > #split拆分后为一个2元数组 > @udf(input_types=[DataTypes.ARRAY(DataTypes.STRING()), DataTypes.INT()], > result_type=DataTypes.STRING()) > def get(array, index): > return array[index] > > > @udf(input_types=[DataTypes.ARRAY(DataTypes.STRING())], > result_type=DataTypes.STRING()) > def convert(array): > return get(array, 0) > > > t_env.register_function("convert", convert) > > > t_env.connect(FileSystem().path('/tmp/output')) \ > .with_format(OldCsv() > .field('b', > DataTypes.STRING())) \ > .with_schema(Schema() > .field('b', > DataTypes.STRING())) \ > .create_temporary_table('mySink') > > > t_env.from_elements(elements)\ > .alias('line')\ > .select('split(line)')\ > .alias('array')\ > .select('convert(array) as b')\ > .insert_into('mySink')\ > .t_env.execute("convert_job") > > 报错为: > TypeError: 'UserDefinedFunctionWrapper' object is not callable |
大佬好,去掉了运行还是出错,一样的错误
|
Hi, 小学生
我稍微修改了一下你的code(你的from_elements那样写按理说就没法运行) code是能够正确运行的,你可以参考一下,你去掉的是不是有问题,或者你把你修改后的代码贴上来,再一起看看 from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings, TableConfig, \ BatchTableEnvironment from pyflink.table.descriptors import Schema, OldCsv, FileSystem from pyflink.table.udf import udf from pyflink.datastream import StreamExecutionEnvironment def test_test(): elements = 'aaa|bbb' env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env, environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()) @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.ARRAY(DataTypes.STRING())) def split(x): print(x.strip().split("|")) return x.strip().split("|") # t_env.register_function("split", udf(lambda i: i.strip().split("|"), [DataTypes.STRING()], DataTypes.ARRAY(DataTypes.STRING()))) t_env.register_function("split", split) # split拆分后为一个2元数组 def get(arr, index): return arr[index] @udf(input_types=[DataTypes.ARRAY(DataTypes.STRING())], result_type=DataTypes.STRING()) def convert(arr): return get(arr, 0) t_env.register_function("convert", convert) t_env.connect(FileSystem().path('/tmp/output')) \ .with_format(OldCsv() .field('b', DataTypes.STRING())) \ .with_schema(Schema() .field('b', DataTypes.STRING())) \ .create_temporary_table('mySink') t_env.from_elements([(elements,), ], ["a"]) \ .alias('line') \ .select('split(line) as arr') \ .select('convert(arr) as b') \ .insert_into('mySink') t_env.execute("convert_job") if __name__ == '__main__': test_test() Best, Xingbo 小学生 <[hidden email]> 于2020年6月4日周四 下午3:38写道: > 大佬好,去掉了运行还是出错,一样的错误 |
Free forum by Nabble | Edit this page |