请教个问题,使用flink sql 去拉取mysql数据,mysql源表有千万级别数据量,使用了 select * from sourcTable
limit 10; 即使是limit几条数据也会导致内存暴增。这里的limit是从mysql原表执行后 直接在flink taskmanager进行limit取数据吗? |
可能是没有下推到MySQL执行。
问题和我遇到的类似:http://apache-flink.147419.n8.nabble.com/Flink-MySQL-td10374.html -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
hi, LIMIT PUSH DOWN 近期已经merge进了master分支了。
[1] https://github.com/apache/flink/pull/13800 Land <[hidden email]> 于2021年1月22日周五 上午11:28写道: > 可能是没有下推到MySQL执行。 > 问题和我遇到的类似: > http://apache-flink.147419.n8.nabble.com/Flink-MySQL-td10374.html > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
嗯嗯 好的 谢谢大家 ,应该就是这个问题了,merge到分支验证下
On Fri, Jan 22, 2021 at 11:35 AM Shengkai Fang <[hidden email]> wrote: > hi, LIMIT PUSH DOWN 近期已经merge进了master分支了。 > > [1] https://github.com/apache/flink/pull/13800 > > Land <[hidden email]> 于2021年1月22日周五 上午11:28写道: > > > 可能是没有下推到MySQL执行。 > > 问题和我遇到的类似: > > http://apache-flink.147419.n8.nabble.com/Flink-MySQL-td10374.html > > > > > > > > -- > > Sent from: http://apache-flink.147419.n8.nabble.com/ > > > |
In reply to this post by Shengkai Fang
flink run -py new_jdbc_source.py
Traceback (most recent call last): File "new_jdbc_source.py", line 66, in <module> st_env.execute_sql("select * from feature_bar_sink").print() File "/Users/derek.bao/dev-utils/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 543, in execute_sql File "/Users/derek.bao/dev-utils/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__ File "/Users/derek.bao/dev-utils/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco File "/Users/derek.bao/dev-utils/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o10.executeSql. : java.lang.UnsupportedOperationException: Currently, a DynamicTableSource with SupportsLimitPushDown ability is not supported. at org.apache.flink.table.planner.plan.schema.CatalogSourceTable$$anonfun$validateTableSource$1.apply(CatalogSourceTable.scala:210) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable$$anonfun$validateTableSource$1.apply(CatalogSourceTable.scala:208) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.validateTableSource(CatalogSourceTable.scala:208) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:142) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78) at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) org.apache.flink.client.program.ProgramAbortException at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) 我试着cherry -pick 这几个commit https://github.com/apache/flink/pull/13800/commits/ba4c6121faa50f3aa26b8c05bf7ea36b85d82642 出现这个报错了 On Fri, Jan 22, 2021 at 11:35 AM Shengkai Fang <[hidden email]> wrote: > hi, LIMIT PUSH DOWN 近期已经merge进了master分支了。 > > [1] https://github.com/apache/flink/pull/13800 > > Land <[hidden email]> 于2021年1月22日周五 上午11:28写道: > > > 可能是没有下推到MySQL执行。 > > 问题和我遇到的类似: > > http://apache-flink.147419.n8.nabble.com/Flink-MySQL-td10374.html > > > > > > > > -- > > Sent from: http://apache-flink.147419.n8.nabble.com/ > > > |
hi,
报错信息: java.lang.UnsupportedOperationException: Currently, a DynamicTableSource with SupportsLimitPushDown ability is not supported. 如果你当前的版本不是1.12的话,那么你还需要pick下rule[1]。可以关注下这个jira[2],这里包含了所有对于SupportXXX的优化。 如果只是本地测试的话还是建议用发布的1.12 + 之前提到的commit,自己pick可能有点问题。 [1] https://github.com/apache/flink/pull/12964 [2] https://issues.apache.org/jira/browse/FLINK-16987 zhang hao <[hidden email]> 于2021年1月25日周一 下午3:14写道: > flink run -py new_jdbc_source.py > Traceback (most recent call last): > File "new_jdbc_source.py", line 66, in <module> > st_env.execute_sql("select * from feature_bar_sink").print() > File > > "/Users/derek.bao/dev-utils/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/table_environment.py", > line 543, in execute_sql > File > > "/Users/derek.bao/dev-utils/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 1286, in __call__ > File > > "/Users/derek.bao/dev-utils/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", > line 147, in deco > File > > "/Users/derek.bao/dev-utils/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", > line 328, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling > o10.executeSql. > : java.lang.UnsupportedOperationException: Currently, a DynamicTableSource > with SupportsLimitPushDown ability is not supported. > at > > org.apache.flink.table.planner.plan.schema.CatalogSourceTable$$anonfun$validateTableSource$1.apply(CatalogSourceTable.scala:210) > at > > org.apache.flink.table.planner.plan.schema.CatalogSourceTable$$anonfun$validateTableSource$1.apply(CatalogSourceTable.scala:208) > at scala.collection.immutable.List.foreach(List.scala:392) > at > > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.validateTableSource(CatalogSourceTable.scala:208) > at > > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:142) > at > > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78) > at > > org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) > at > > org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) > at > > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102) > at > > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) > at > > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) > at > > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) > at > > org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) > at > > org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) > at > > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) > at > > org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789) > at > > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761) > at > > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) > at > > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.lang.Thread.run(Thread.java:748) > > > org.apache.flink.client.program.ProgramAbortException > at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > at > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > at > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > at > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) > at > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > at > > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) > > 我试着cherry -pick 这几个commit > > https://github.com/apache/flink/pull/13800/commits/ba4c6121faa50f3aa26b8c05bf7ea36b85d82642 > 出现这个报错了 > > On Fri, Jan 22, 2021 at 11:35 AM Shengkai Fang <[hidden email]> wrote: > > > hi, LIMIT PUSH DOWN 近期已经merge进了master分支了。 > > > > [1] https://github.com/apache/flink/pull/13800 > > > > Land <[hidden email]> 于2021年1月22日周五 上午11:28写道: > > > > > 可能是没有下推到MySQL执行。 > > > 问题和我遇到的类似: > > > http://apache-flink.147419.n8.nabble.com/Flink-MySQL-td10374.html > > > > > > > > > > > > -- > > > Sent from: http://apache-flink.147419.n8.nabble.com/ > > > > > > |
Free forum by Nabble | Edit this page |