使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中?
场景:使用pyflink通过filter进行条件过滤后插入到sink中, 比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中: { "logType":"syslog", "message":"sla;flkdsjf" } { "logType":"alarm", "message":"sla;flkdsjf" } t_env.from_path("source")\ .filter("logType=syslog")\ .insert_into("sink1") 有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗: if logType=="syslog": insert_into(sink1) elif logType=="alarm": insert_into(sink2) 如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下: t_env.from_path("source")\ .filter("logType=syslog")\ .insert_into("sink1")\ .filter("logType=alarm")\ .insert_into("sink2") 请各位大牛指点,感谢 |
测试使用如下结构: table= t_env.from_path("source") if table.filter("logType=syslog"): table.filter("logType=syslog").insert_into("sink1") elif table.filter("logType=alarm"): table.filter("logType=alarm").insert_into("sink2") 我测试了下,好像table.filter("logType=syslog").insert_into("sink1")生效,下面的elif不生效,原因是table.filter("logType=syslog")或者table.where在做条件判断的同时已经将数据进行过滤,走不到下面的分支?? 在 2020-06-19 10:08:25,"jack" <[hidden email]> 写道: >使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中? > > >场景:使用pyflink通过filter进行条件过滤后插入到sink中, >比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中: >{ > "logType":"syslog", > "message":"sla;flkdsjf" >} >{ > "logType":"alarm", > "message":"sla;flkdsjf" >} > t_env.from_path("source")\ > .filter("logType=syslog")\ > .insert_into("sink1") >有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗: >if logType=="syslog": > insert_into(sink1) >elif logType=="alarm": > insert_into(sink2) > > >如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下: > > > t_env.from_path("source")\ > .filter("logType=syslog")\ > .insert_into("sink1")\ > .filter("logType=alarm")\ > .insert_into("sink2") >请各位大牛指点,感谢 > > > > > |
您好,jack:
Table API 不用 if/else 直接用类似逻辑即可: val t1 = table.filter('x > 2).groupBy(..) val t2 = table.filter('x <= 2).groupBy(..) t1.insert_into("sink1) t2.insert_into("sink2") Best, Jincheng jack <[hidden email]> 于2020年6月19日周五 上午10:35写道: > > 测试使用如下结构: > table= t_env.from_path("source") > > if table.filter("logType=syslog"): > table.filter("logType=syslog").insert_into("sink1") > elif table.filter("logType=alarm"): > table.filter("logType=alarm").insert_into("sink2") > > > 我测试了下,好像table > .filter("logType=syslog").insert_into("sink1")生效,下面的elif不生效,原因是 > table.filter("logType=syslog")或者table.where在做条件判断的同时已经将数据进行过滤,走不到下面的分支?? > > > > > 在 2020-06-19 10:08:25,"jack" <[hidden email]> 写道: > >使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中? > > > > > >场景:使用pyflink通过filter进行条件过滤后插入到sink中, > >比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中: > >{ > > "logType":"syslog", > > "message":"sla;flkdsjf" > >} > >{ > > "logType":"alarm", > > "message":"sla;flkdsjf" > >} > > t_env.from_path("source")\ > > .filter("logType=syslog")\ > > .insert_into("sink1") > >有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗: > >if logType=="syslog": > > insert_into(sink1) > >elif logType=="alarm": > > insert_into(sink2) > > > > > >如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下: > > > > > > t_env.from_path("source")\ > > .filter("logType=syslog")\ > > .insert_into("sink1")\ > > .filter("logType=alarm")\ > > .insert_into("sink2") > >请各位大牛指点,感谢 > > > > > > > > > > > > |
您好,jincheng老师,我已经验证了您提供的这种分开处理的逻辑,可以解决我的问题,非常感谢您的解惑
Best, Jack 在 2020-06-22 14:28:04,"jincheng sun" <[hidden email]> 写道: 您好,jack: Table API 不用 if/else 直接用类似逻辑即可: val t1 = table.filter('x > 2).groupBy(..) val t2 = table.filter('x <= 2).groupBy(..) t1.insert_into("sink1) t2.insert_into("sink2") Best, Jincheng jack <[hidden email]> 于2020年6月19日周五 上午10:35写道: 测试使用如下结构: table= t_env.from_path("source") if table.filter("logType=syslog"): table.filter("logType=syslog").insert_into("sink1") elif table.filter("logType=alarm"): table.filter("logType=alarm").insert_into("sink2") 我测试了下,好像table.filter("logType=syslog").insert_into("sink1")生效,下面的elif不生效,原因是table.filter("logType=syslog")或者table.where在做条件判断的同时已经将数据进行过滤,走不到下面的分支?? 在 2020-06-19 10:08:25,"jack" <[hidden email]> 写道: >使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中? > > >场景:使用pyflink通过filter进行条件过滤后插入到sink中, >比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中: >{ > "logType":"syslog", > "message":"sla;flkdsjf" >} >{ > "logType":"alarm", > "message":"sla;flkdsjf" >} > t_env.from_path("source")\ > .filter("logType=syslog")\ > .insert_into("sink1") >有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗: >if logType=="syslog": > insert_into(sink1) >elif logType=="alarm": > insert_into(sink2) > > >如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下: > > > t_env.from_path("source")\ > .filter("logType=syslog")\ > .insert_into("sink1")\ > .filter("logType=alarm")\ > .insert_into("sink2") >请各位大牛指点,感谢 > > > > > |
Free forum by Nabble | Edit this page |