Flink 1.11里的org.apache.flink.table.planner.ParserImpl的parse方法里包含了对Planner相关方法的调用,这导致在某些前置sql(例如insert into用到的表的create table语句)没有执行之前,这个parse方法会报错。如果只是想调用Calcite的相关的功能去parse sql语句,有什么办法可以做到吗?能想到的一个办法是通过反射拿到ParserImpl里面的calciteParserSupplier。想知道Flink有没有提供直接的接口或者方法去做纯的sql parsing。
谢谢~ |
Hi,
我之前跟你有相同的需求,实现方式也跟你的思路基本类似, mock一个env 然后反射获取calciteParserSupplier 目前在生产环境运行良好 FYI ________________________________ 发件人: 马阳阳 <[hidden email]> 发送时间: 2020年10月19日 17:57:47 收件人: Flink中文邮件列表 主题: Flink 1.11里如何parse出未解析的执行计划 Flink 1.11里的org.apache.flink.table.planner.ParserImpl的parse方法里包含了对Planner相关方法的调用,这导致在某些前置sql(例如insert into用到的表的create table语句)没有执行之前,这个parse方法会报错。如果只是想调用Calcite的相关的功能去parse sql语句,有什么办法可以做到吗?能想到的一个办法是通过反射拿到ParserImpl里面的calciteParserSupplier。想知道Flink有没有提供直接的接口或者方法去做纯的sql parsing。 谢谢~ |
In reply to this post by 马阳阳
这里我当时也想要弄一下,不过失败了我。最后用了calcite来弄,你这块具体是如何去弄的?
------------------ 原始邮件 ------------------ 发件人: 马阳阳 <[hidden email]> 发送时间: 2020年10月19日 17:58 收件人: Flink中文邮件列表 <[hidden email]> 主题: 回复:Flink 1.11里如何parse出未解析的执行计划 Flink 1.11里的org.apache.flink.table.planner.ParserImpl的parse方法里包含了对Planner相关方法的调用,这导致在某些前置sql(例如insert into用到的表的create table语句)没有执行之前,这个parse方法会报错。如果只是想调用Calcite的相关的功能去parse sql语句,有什么办法可以做到吗?能想到的一个办法是通过反射拿到ParserImpl里面的calciteParserSupplier。想知道Flink有没有提供直接的接口或者方法去做纯的sql parsing。 谢谢~ |
In reply to this post by 马阳阳
我简单写了一下仅供参考
import org.apache.calcite.config.Lex; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.sql.parser.SqlParser; import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl; import org.apache.flink.sql.parser.validate.FlinkSqlConformance; /** * @author: silence * @date: 2020/10/22 */ public class Test { public static void main(String[] args) throws SqlParseException { String sql = "xxx"; SqlParser.Config sqlParserConfig = SqlParser .configBuilder() .setParserFactory(FlinkSqlParserImpl.FACTORY) .setConformance(FlinkSqlConformance.DEFAULT) .setLex(Lex.JAVA) .setIdentifierMaxLength(256) .build(); SqlParser sqlParser = SqlParser.create(sql, sqlParserConfig); SqlNodeList sqlNodes = sqlParser.parseStmtList(); for (SqlNode sqlNode : sqlNodes) { //do something } } } -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi,
我的做法如我所说,是用反射将parser拿出来的,比较hack但是很简单而且很稳妥 代码差不多就是下面这个样子 Flink version: custom version base on 1.11.x @PostConstruct private void setup() throws NoSuchFieldException, IllegalAccessException { final StreamTableEnvironmentImpl env = (StreamTableEnvironmentImpl) support.getStreamTableEnvironment(); final Field field = env.getParser().getClass().getDeclaredField("calciteParserSupplier"); field.setAccessible(true); // 普通的parser final Supplier<CalciteParser> defaultSupplier = (Supplier<CalciteParser>) field.get(env.getParser()); this.defaultSupplier = defaultSupplier; env.getConfig().setSqlDialect(SqlDialect.HIVE); final Field field2 = env.getParser().getClass().getDeclaredField("calciteParserSupplier"); field2.setAccessible(true); // hive的parser final Supplier<CalciteParser> hiveSupplier = (Supplier<CalciteParser>) field.get(env.getParser()); this.hiveSupplier = hiveSupplier; } WARN: 这种做法带来的比较不好的点是,项目的依赖会比较多。我这边恰好是一个在一个单独的且包含Flink依赖的服务中做的这件事从而规避了这个问题。 如果要参考上面这种做法的话,注意评估依赖的问题啦~ ________________________________ 发件人: silence <[hidden email]> 发送时间: 2020年10月22日 12:05:38 收件人: [hidden email] 主题: Re: Flink 1.11里如何parse出未解析的执行计划 我简单写了一下仅供参考 import org.apache.calcite.config.Lex; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.sql.parser.SqlParser; import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl; import org.apache.flink.sql.parser.validate.FlinkSqlConformance; /** * @author: silence * @date: 2020/10/22 */ public class Test { public static void main(String[] args) throws SqlParseException { String sql = "xxx"; SqlParser.Config sqlParserConfig = SqlParser .configBuilder() .setParserFactory(FlinkSqlParserImpl.FACTORY) .setConformance(FlinkSqlConformance.DEFAULT) .setLex(Lex.JAVA) .setIdentifierMaxLength(256) .build(); SqlParser sqlParser = SqlParser.create(sql, sqlParserConfig); SqlNodeList sqlNodes = sqlParser.parseStmtList(); for (SqlNode sqlNode : sqlNodes) { //do something } } } -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |