请教各位,我这边使用pyflink 消费kafka json字符串数据发送到kafka, 把kafka消息按照一个字段来处理,
数据输入: {"topic": "logSource", "message": "x=1,y=1,z=1"} 发送到kafka里面的数据结果如下: "{""topic"": ""logSource"", ""message"": ""x=1,y=1,z=1""}" 又被双引号包了一层, 我的udf函数中使用的是 json模块进行处理,请各位帮我看一下怎么样转成正常的json串。 @udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()], result_type=DataTypes.STRING()) defkv(log, pair_sep=',', kv_sep='='): import json log = json.loads(log) ret = {} items = re.split(pair_sep, log.get("message")) for item in items: ret[re.split(kv_sep, item)[0]] = re.split(kv_sep, item)[1] log.update(ret) log = json.dumps(log) return log defregister_source(st_env): st_env \ .connect( # declare the external system to connect to Kafka() .version("0.10") .topic("logSource") .start_from_latest() .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092")) \ .with_format( # declare a format for this system Csv() .schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())])) .field_delimiter("\n")) \ .with_schema( # declare the schema of the table Schema() .field("log", DataTypes.STRING())) \ .in_append_mode() \ .register_table_source("source") defregister_sink(st_env): st_env.connect( Kafka() .version("0.10") .topic("logSink") .start_from_earliest() .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092")) \ .with_format( # declare a format for this system Csv() .schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())]))) \ .with_schema( # declare the schema of the table Schema() .field("log", DataTypes.STRING())) \ .in_append_mode() \ .register_table_sink("sink") if __name__ == '__main__': s_env = StreamExecutionEnvironment.get_execution_environment() s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) s_env.set_parallelism(1) st_env = StreamTableEnvironment \ .create(s_env, environment_settings=EnvironmentSettings .new_instance() .in_streaming_mode() .use_blink_planner().build()) st_env.register_function('e_kv', e_kv) register_source(st_env) register_sink(st_env) st_env \ .from_path("source") \ .select("kv(log,',', '=') as log") \ .insert_into("sink") \ st_env.execute("test") |
Hi,
其实这个是CSV connector的一个可选的 quote_character参数的效果,默认值是",所以你每个字段都会这样外面包一层,你可以配置一下这个参数为\0,就可以得到你要的效果了。 st_env.connect( Kafka() .version("0.11") .topic("logSink") .start_from_earliest() .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092")) \ .with_format( # declare a format for this system Csv() .schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())])) .quote_character("\0") ) \ .with_schema( # declare the schema of the table Schema() .field("log", DataTypes.STRING())) \ .in_append_mode() \ .register_table_sink("sink") Best, Xingbo jack <[hidden email]> 于2020年6月1日周一 下午5:31写道: > *请教各位,我这边使用pyflink 消费kafka json字符串数据发送到kafka, 把kafka消息按照一个字段来处理,* > > *数据输入:* > {"topic": "logSource", "message": "x=1,y=1,z=1"} > > 发送到kafka里面的数据结果如下: > "{""topic"": ""logSource"", ""message"": ""x=1,y=1,z=1""}" > > *又被双引号包了一层, 我的udf函数中使用的是 json模块进行处理,请各位帮我看一下怎么样转成正常的json串。* > > @udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()], result_type=DataTypes.STRING()) > def kv(log, pair_sep=',', kv_sep='='): > import json > log = json.loads(log) > ret = {} > items = re.split(pair_sep, log.get("message")) > for item in items: > ret[re.split(kv_sep, item)[0]] = re.split(kv_sep, item)[1] > log.update(ret) > log = json.dumps(log) > return log > > > def register_source(st_env): > st_env \ > .connect( # declare the external system to connect to > Kafka() > .version("0.10") > .topic("logSource") > .start_from_latest() > .property("zookeeper.connect", "localhost:2181") > .property("bootstrap.servers", "localhost:9092")) \ > .with_format( # declare a format for this system > Csv() > .schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())])) > .field_delimiter("\n")) \ > .with_schema( # declare the schema of the table > Schema() > .field("log", DataTypes.STRING())) \ > .in_append_mode() \ > .register_table_source("source") > > def register_sink(st_env): > st_env.connect( > Kafka() > .version("0.10") > .topic("logSink") > .start_from_earliest() > .property("zookeeper.connect", "localhost:2181") > .property("bootstrap.servers", "localhost:9092")) \ > .with_format( # declare a format for this system > Csv() > .schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())]))) \ > .with_schema( # declare the schema of the table > Schema() > .field("log", DataTypes.STRING())) \ > .in_append_mode() \ > .register_table_sink("sink") > > if __name__ == '__main__': > > s_env = StreamExecutionEnvironment.get_execution_environment() > s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > s_env.set_parallelism(1) > st_env = StreamTableEnvironment \ > .create(s_env, environment_settings=EnvironmentSettings > .new_instance() > .in_streaming_mode() > .use_blink_planner().build()) > st_env.register_function('e_kv', e_kv) > register_source(st_env) > register_sink(st_env) > st_env \ > .from_path("source") \ > .select("kv(log,',', '=') as log") \ > .insert_into("sink") \ > st_env.execute("test") > > > |
非常感谢解惑,刚开始使用pyflink,不是很熟悉,还请多多指教
在 2020-06-01 20:50:53,"Xingbo Huang" <[hidden email]> 写道: Hi, 其实这个是CSV connector的一个可选的quote_character参数的效果,默认值是",所以你每个字段都会这样外面包一层,你可以配置一下这个参数为\0,就可以得到你要的效果了。 st_env.connect( Kafka() .version("0.11") .topic("logSink") .start_from_earliest() .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092")) \ .with_format( # declare a format for this system Csv() .schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())])) .quote_character("\0") ) \ .with_schema( # declare the schema of the table Schema() .field("log", DataTypes.STRING())) \ .in_append_mode() \ .register_table_sink("sink") Best, Xingbo |
客气客气,互相交流学习😀
Best, Xingbo jack <[hidden email]> 于2020年6月1日周一 下午9:07写道: > 非常感谢解惑,刚开始使用pyflink,不是很熟悉,还请多多指教 > > > > > > > 在 2020-06-01 20:50:53,"Xingbo Huang" <[hidden email]> 写道: > > Hi, > 其实这个是CSV connector的一个可选的 > quote_character参数的效果,默认值是",所以你每个字段都会这样外面包一层,你可以配置一下这个参数为\0,就可以得到你要的效果了。 > st_env.connect( > Kafka() > .version("0.11") > .topic("logSink") > .start_from_earliest() > .property("zookeeper.connect", "localhost:2181") > .property("bootstrap.servers", "localhost:9092")) \ > .with_format( # declare a format for this system > Csv() > .schema(DataTypes.ROW([DataTypes.FIELD("log", > DataTypes.STRING())])) > .quote_character("\0") > ) \ > .with_schema( # declare the schema of the table > Schema() > .field("log", DataTypes.STRING())) \ > .in_append_mode() \ > .register_table_sink("sink") > > Best, > Xingbo > >> >> >> |
In reply to this post by jack
问题请教:
描述: pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。 flink能否实现这样的方式? 感谢 |
可以用zeppelin的z.show 来查询job结果。这边有pyflink在zeppelin上的入门教程
https://www.bilibili.com/video/BV1Te411W73b?p=20 可以加入钉钉群讨论:30022475 jack <[hidden email]> 于2020年6月9日周二 下午5:28写道: > 问题请教: > 描述: pyflink 从source通过sql对数据进行查询聚合等操作 > 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。 > > flink能否实现这样的方式? > 感谢 > -- Best Regards Jeff Zhang |
你好 Jack,
> pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果, 我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询 我理解你上面说的 【直接作为结果】+ 【web接口查询】已经包含了“sink”的动作。只是这个“sink” 是这样的实现而已。对于您的场景: 1. 如果您想直接将结果不落地(不存储)执行推送的 web页面,可以自定义一个Web Socket的Sink。 2. 如果您不是想直接推送到web页面,而是通过查询拉取结果,那么您上面说的 【直接作为结果】这句话就要描述一下,您想怎样作为结果?我理解是要落盘的(持久化),所以这里持久化本质也是一个sink。Flink可以支持很多中sink,比如:数据库,文件系统,消息队列等等。您可以参考官方文档: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html 如果上面回复 没有解决你的问题,欢迎随时反馈~~ Best, Jincheng Jeff Zhang <[hidden email]> 于2020年6月9日周二 下午5:39写道: > 可以用zeppelin的z.show 来查询job结果。这边有pyflink在zeppelin上的入门教程 > https://www.bilibili.com/video/BV1Te411W73b?p=20 > 可以加入钉钉群讨论:30022475 > > > > jack <[hidden email]> 于2020年6月9日周二 下午5:28写道: > >> 问题请教: >> 描述: pyflink 从source通过sql对数据进行查询聚合等操作 >> 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。 >> >> flink能否实现这样的方式? >> 感谢 >> > > > -- > Best Regards > > Jeff Zhang > |
hi jack,jincheng
Flink 1.11 支持直接将select的结果collect到本地,例如: CloseableIterator<Row> it = tEnv.executeSql("select ...").collect(); while(it.hasNext()) { it.next() .... } 但是 pyflink 还没有引入 collect() 接口。(后续会完善?@jincheng) 但是1.11的TableResult#collect实现对流的query支持不完整(只支持append only的query),master已经完整支持。 可以参照 jincheng 的意见,(或者结合 TableResult#collect 的实现),完成一个自己的 sink 也可以。 Best, Godfrey jincheng sun <[hidden email]> 于2020年6月15日周一 下午4:14写道: > 你好 Jack, > > > pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果, > 我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询 > > 我理解你上面说的 【直接作为结果】+ 【web接口查询】已经包含了“sink”的动作。只是这个“sink” 是这样的实现而已。对于您的场景: > 1. 如果您想直接将结果不落地(不存储)执行推送的 web页面,可以自定义一个Web Socket的Sink。 > 2. 如果您不是想直接推送到web页面,而是通过查询拉取结果,那么您上面说的 > 【直接作为结果】这句话就要描述一下,您想怎样作为结果?我理解是要落盘的(持久化),所以这里持久化本质也是一个sink。Flink可以支持很多中sink,比如:数据库,文件系统,消息队列等等。您可以参考官方文档: > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html > > 如果上面回复 没有解决你的问题,欢迎随时反馈~~ > > Best, > Jincheng > > > > Jeff Zhang <[hidden email]> 于2020年6月9日周二 下午5:39写道: > >> 可以用zeppelin的z.show 来查询job结果。这边有pyflink在zeppelin上的入门教程 >> https://www.bilibili.com/video/BV1Te411W73b?p=20 >> 可以加入钉钉群讨论:30022475 >> >> >> >> jack <[hidden email]> 于2020年6月9日周二 下午5:28写道: >> >>> 问题请教: >>> 描述: pyflink 从source通过sql对数据进行查询聚合等操作 >>> 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。 >>> >>> flink能否实现这样的方式? >>> 感谢 >>> >> >> >> -- >> Best Regards >> >> Jeff Zhang >> > |
In reply to this post by jincheng sun
感谢您的建议,目前在学习使用pyflink,使用pyflink做各种有趣的尝试,包括udf函数做日志解析等,也看过
目前官方文档对于pyflink的文档和例子还是偏少,遇到问题了还是需要向各位大牛们多多请教。 Best, Jack 在 2020-06-15 16:13:32,"jincheng sun" <[hidden email]> 写道: >你好 Jack, > >> pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果, >我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询 > >我理解你上面说的 【直接作为结果】+ 【web接口查询】已经包含了“sink”的动作。只是这个“sink” 是这样的实现而已。对于您的场景: >1. 如果您想直接将结果不落地(不存储)执行推送的 web页面,可以自定义一个Web Socket的Sink。 >2. 如果您不是想直接推送到web页面,而是通过查询拉取结果,那么您上面说的 >【直接作为结果】这句话就要描述一下,您想怎样作为结果?我理解是要落盘的(持久化),所以这里持久化本质也是一个sink。Flink可以支持很多中sink,比如:数据库,文件系统,消息队列等等。您可以参考官方文档: >https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html > >如果上面回复 没有解决你的问题,欢迎随时反馈~~ > >Best, >Jincheng > > > >Jeff Zhang <[hidden email]> 于2020年6月9日周二 下午5:39写道: > >> 可以用zeppelin的z.show 来查询job结果。这边有pyflink在zeppelin上的入门教程 >> https://www.bilibili.com/video/BV1Te411W73b?p=20 >> 可以加入钉钉群讨论:30022475 >> >> >> >> jack <[hidden email]> 于2020年6月9日周二 下午5:28写道: >> >>> 问题请教: >>> 描述: pyflink 从source通过sql对数据进行查询聚合等操作 >>> 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。 >>> >>> flink能否实现这样的方式? >>> 感谢 >>> >> >> >> -- >> Best Regards >> >> Jeff Zhang >> |
In reply to this post by godfrey he
hi
感谢您的建议,我这边尝试一下自定义实现sink的方式。 Best, Jack 在 2020-06-15 18:08:15,"godfrey he" <[hidden email]> 写道: hi jack,jincheng Flink 1.11 支持直接将select的结果collect到本地,例如: CloseableIterator<Row> it = tEnv.executeSql("select ...").collect(); while(it.hasNext()) { it.next() .... } 但是 pyflink 还没有引入 collect() 接口。(后续会完善?@jincheng) 但是1.11的TableResult#collect实现对流的query支持不完整(只支持append only的query),master已经完整支持。 可以参照 jincheng 的意见,(或者结合 TableResult#collect 的实现),完成一个自己的 sink 也可以。 Best, Godfrey jincheng sun <[hidden email]> 于2020年6月15日周一 下午4:14写道: 你好 Jack, > pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询 我理解你上面说的 【直接作为结果】+ 【web接口查询】已经包含了“sink”的动作。只是这个“sink” 是这样的实现而已。对于您的场景: 1. 如果您想直接将结果不落地(不存储)执行推送的 web页面,可以自定义一个Web Socket的Sink。 2. 如果您不是想直接推送到web页面,而是通过查询拉取结果,那么您上面说的 【直接作为结果】这句话就要描述一下,您想怎样作为结果?我理解是要落盘的(持久化),所以这里持久化本质也是一个sink。Flink可以支持很多中sink,比如:数据库,文件系统,消息队列等等。您可以参考官方文档: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html 如果上面回复 没有解决你的问题,欢迎随时反馈~~ Best, Jincheng Jeff Zhang <[hidden email]> 于2020年6月9日周二 下午5:39写道: 可以用zeppelin的z.show 来查询job结果。这边有pyflink在zeppelin上的入门教程 https://www.bilibili.com/video/BV1Te411W73b?p=20 可以加入钉钉群讨论:30022475 jack <[hidden email]> 于2020年6月9日周二 下午5:28写道: 问题请教: 描述: pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。 flink能否实现这样的方式? 感谢 -- Best Regards Jeff Zhang |
Free forum by Nabble | Edit this page |