代码如下:
// tEnv; tEnv.sqlUpdate("create table dr1( " + " cid STRING, " + " server_time BIGINT, " + " d MAP<STRING, STRING>, " + " process_time AS PROCTIME(), " + " event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / 1000)), " + " WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND " + ") WITH ( " + " 'update-mode' = 'append', " + " 'connector.type' = 'kafka', " + " 'connector.version' = 'universal', " + " 'connector.topic' = 'antibot_dr1', " + " 'connector.startup-mode' = 'latest-offset', " + " 'connector.properties.zookeeper.connect' = 'yq01-sw-xxx03.yq01:8681', " + " 'connector.properties.bootstrap.servers' = 'yq01-sw-xxx03.yq01:8192', " + " 'format.type' = 'json' " + ")"); Table t1 = tEnv.sqlQuery("select * from dr1"); 我打包会把flink-json打包进去,最终结果包是test.jar。 test.jar是个fat jar,相关依赖都有了。 然后我执行:flink run -c test.SQLWC1 --detached test.jar 报错: Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in the classpath. 可是我flink-json.jar都打包进去了,居然还是报错。。。 解决方式,必须是执行 flink run -c test.SQLWC1 --detached test.jar 这个命令的机器 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗? |
可能是打fat
jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现 On Fri, Aug 14, 2020 at 7:13 PM 赵一旦 <[hidden email]> wrote: > 代码如下: > // tEnv; > tEnv.sqlUpdate("create table dr1( " + > " cid STRING, " + > " server_time BIGINT, " + > " d MAP<STRING, STRING>, " + > " process_time AS PROCTIME(), " + > " event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / 1000)), > " + > " WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND > " + > ") WITH ( " + > " 'update-mode' = 'append', " + > " 'connector.type' = 'kafka', " + > " 'connector.version' = 'universal', " + > " 'connector.topic' = 'antibot_dr1', " + > " 'connector.startup-mode' = 'latest-offset', " + > " 'connector.properties.zookeeper.connect' = > 'yq01-sw-xxx03.yq01:8681', " + > " 'connector.properties.bootstrap.servers' = > 'yq01-sw-xxx03.yq01:8192', " + > " 'format.type' = 'json' " + > ")"); > Table t1 = tEnv.sqlQuery("select * from dr1"); > > 我打包会把flink-json打包进去,最终结果包是test.jar。 > > test.jar是个fat jar,相关依赖都有了。 > > 然后我执行:flink run -c test.SQLWC1 --detached test.jar 报错: > > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: > Could not find a suitable table factory for > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in > the classpath. > > 可是我flink-json.jar都打包进去了,居然还是报错。。。 > > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached test.jar 这个命令的机器 > > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。 > > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗? > -- Best regards! Rui Li |
@RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。
所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子? 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。 Rui Li <[hidden email]> 于2020年8月17日周一 下午3:46写道: > 可能是打fat > jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现 > > On Fri, Aug 14, 2020 at 7:13 PM 赵一旦 <[hidden email]> wrote: > > > 代码如下: > > // tEnv; > > tEnv.sqlUpdate("create table dr1( " + > > " cid STRING, " + > > " server_time BIGINT, " + > > " d MAP<STRING, STRING>, " + > > " process_time AS PROCTIME(), " + > > " event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / 1000)), > > " + > > " WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND > > " + > > ") WITH ( " + > > " 'update-mode' = 'append', " + > > " 'connector.type' = 'kafka', " + > > " 'connector.version' = 'universal', " + > > " 'connector.topic' = 'antibot_dr1', " + > > " 'connector.startup-mode' = 'latest-offset', " + > > " 'connector.properties.zookeeper.connect' = > > 'yq01-sw-xxx03.yq01:8681', " + > > " 'connector.properties.bootstrap.servers' = > > 'yq01-sw-xxx03.yq01:8192', " + > > " 'format.type' = 'json' " + > > ")"); > > Table t1 = tEnv.sqlQuery("select * from dr1"); > > > > 我打包会把flink-json打包进去,最终结果包是test.jar。 > > > > test.jar是个fat jar,相关依赖都有了。 > > > > 然后我执行:flink run -c test.SQLWC1 --detached test.jar 报错: > > > > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: > > Could not find a suitable table factory for > > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in > > the classpath. > > > > 可是我flink-json.jar都打包进去了,居然还是报错。。。 > > > > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached test.jar 这个命令的机器 > > > > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。 > > > > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗? > > > > > -- > Best regards! > Rui Li > |
小伙伴们,帮忙看下怎么解决呢? 通过写代码方式,基于table
api执行flink的sql。这种情况下用到的flink-json等包通过shade等方式做成一个大jar包之后依赖的问题。 赵一旦 <[hidden email]> 于2020年8月17日周一 下午5:00写道: > @RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。 > > 所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子? > > > 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。 > > Rui Li <[hidden email]> 于2020年8月17日周一 下午3:46写道: > >> 可能是打fat >> jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现 >> >> On Fri, Aug 14, 2020 at 7:13 PM 赵一旦 <[hidden email]> wrote: >> >> > 代码如下: >> > // tEnv; >> > tEnv.sqlUpdate("create table dr1( " + >> > " cid STRING, " + >> > " server_time BIGINT, " + >> > " d MAP<STRING, STRING>, " + >> > " process_time AS PROCTIME(), " + >> > " event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / >> 1000)), >> > " + >> > " WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND >> > " + >> > ") WITH ( " + >> > " 'update-mode' = 'append', " + >> > " 'connector.type' = 'kafka', " + >> > " 'connector.version' = 'universal', " + >> > " 'connector.topic' = 'antibot_dr1', " + >> > " 'connector.startup-mode' = 'latest-offset', " + >> > " 'connector.properties.zookeeper.connect' = >> > 'yq01-sw-xxx03.yq01:8681', " + >> > " 'connector.properties.bootstrap.servers' = >> > 'yq01-sw-xxx03.yq01:8192', " + >> > " 'format.type' = 'json' " + >> > ")"); >> > Table t1 = tEnv.sqlQuery("select * from dr1"); >> > >> > 我打包会把flink-json打包进去,最终结果包是test.jar。 >> > >> > test.jar是个fat jar,相关依赖都有了。 >> > >> > 然后我执行:flink run -c test.SQLWC1 --detached test.jar 报错: >> > >> > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: >> > Could not find a suitable table factory for >> > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in >> > the classpath. >> > >> > 可是我flink-json.jar都打包进去了,居然还是报错。。。 >> > >> > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached test.jar 这个命令的机器 >> > >> > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。 >> > >> > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗? >> > >> >> >> -- >> Best regards! >> Rui Li >> > |
In reply to this post by nobleyd
用shade plugin的时候可以指定service resource
transformer,应该能把多个service文件merge起来。具体可以参考: https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer On Mon, Aug 17, 2020 at 5:00 PM 赵一旦 <[hidden email]> wrote: > @RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。 > > 所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子? > > > 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。 > > Rui Li <[hidden email]> 于2020年8月17日周一 下午3:46写道: > > > 可能是打fat > > jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现 > > > > On Fri, Aug 14, 2020 at 7:13 PM 赵一旦 <[hidden email]> wrote: > > > > > 代码如下: > > > // tEnv; > > > tEnv.sqlUpdate("create table dr1( " + > > > " cid STRING, " + > > > " server_time BIGINT, " + > > > " d MAP<STRING, STRING>, " + > > > " process_time AS PROCTIME(), " + > > > " event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / > 1000)), > > > " + > > > " WATERMARK FOR event_time AS event_time - INTERVAL '60' > SECOND > > > " + > > > ") WITH ( " + > > > " 'update-mode' = 'append', " + > > > " 'connector.type' = 'kafka', " + > > > " 'connector.version' = 'universal', " + > > > " 'connector.topic' = 'antibot_dr1', " + > > > " 'connector.startup-mode' = 'latest-offset', " + > > > " 'connector.properties.zookeeper.connect' = > > > 'yq01-sw-xxx03.yq01:8681', " + > > > " 'connector.properties.bootstrap.servers' = > > > 'yq01-sw-xxx03.yq01:8192', " + > > > " 'format.type' = 'json' " + > > > ")"); > > > Table t1 = tEnv.sqlQuery("select * from dr1"); > > > > > > 我打包会把flink-json打包进去,最终结果包是test.jar。 > > > > > > test.jar是个fat jar,相关依赖都有了。 > > > > > > 然后我执行:flink run -c test.SQLWC1 --detached test.jar 报错: > > > > > > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: > > > Could not find a suitable table factory for > > > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in > > > the classpath. > > > > > > 可是我flink-json.jar都打包进去了,居然还是报错。。。 > > > > > > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached test.jar 这个命令的机器 > > > > > > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。 > > > > > > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗? > > > > > > > > > -- > > Best regards! > > Rui Li > > > -- Best regards! Rui Li |
In reply to this post by nobleyd
或者通过 flink run 方式运行的任务,能否像 sql-client.sh 那样通过-l,-j指定的jar也会被上传到集群呢?
赵一旦 <[hidden email]> 于2020年8月17日周一 下午5:34写道: > 小伙伴们,帮忙看下怎么解决呢? 通过写代码方式,基于table > api执行flink的sql。这种情况下用到的flink-json等包通过shade等方式做成一个大jar包之后依赖的问题。 > > 赵一旦 <[hidden email]> 于2020年8月17日周一 下午5:00写道: > >> @RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。 >> >> 所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子? >> >> >> 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。 >> >> Rui Li <[hidden email]> 于2020年8月17日周一 下午3:46写道: >> >>> 可能是打fat >>> jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现 >>> >>> On Fri, Aug 14, 2020 at 7:13 PM 赵一旦 <[hidden email]> wrote: >>> >>> > 代码如下: >>> > // tEnv; >>> > tEnv.sqlUpdate("create table dr1( " + >>> > " cid STRING, " + >>> > " server_time BIGINT, " + >>> > " d MAP<STRING, STRING>, " + >>> > " process_time AS PROCTIME(), " + >>> > " event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / >>> 1000)), >>> > " + >>> > " WATERMARK FOR event_time AS event_time - INTERVAL '60' >>> SECOND >>> > " + >>> > ") WITH ( " + >>> > " 'update-mode' = 'append', " + >>> > " 'connector.type' = 'kafka', " + >>> > " 'connector.version' = 'universal', " + >>> > " 'connector.topic' = 'antibot_dr1', " + >>> > " 'connector.startup-mode' = 'latest-offset', " + >>> > " 'connector.properties.zookeeper.connect' = >>> > 'yq01-sw-xxx03.yq01:8681', " + >>> > " 'connector.properties.bootstrap.servers' = >>> > 'yq01-sw-xxx03.yq01:8192', " + >>> > " 'format.type' = 'json' " + >>> > ")"); >>> > Table t1 = tEnv.sqlQuery("select * from dr1"); >>> > >>> > 我打包会把flink-json打包进去,最终结果包是test.jar。 >>> > >>> > test.jar是个fat jar,相关依赖都有了。 >>> > >>> > 然后我执行:flink run -c test.SQLWC1 --detached test.jar 报错: >>> > >>> > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: >>> > Could not find a suitable table factory for >>> > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in >>> > the classpath. >>> > >>> > 可是我flink-json.jar都打包进去了,居然还是报错。。。 >>> > >>> > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached test.jar 这个命令的机器 >>> > >>> > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。 >>> > >>> > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗? >>> > >>> >>> >>> -- >>> Best regards! >>> Rui Li >>> >> |
In reply to this post by Rui Li
哦哦。也就是说本身这个文件内内容是支持一行一个这样的格式的是吧?
Rui Li <[hidden email]> 于2020年8月17日周一 下午5:36写道: > 用shade plugin的时候可以指定service resource > transformer,应该能把多个service文件merge起来。具体可以参考: > > https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer > > On Mon, Aug 17, 2020 at 5:00 PM 赵一旦 <[hidden email]> wrote: > > > @RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。 > > > > 所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子? > > > > > > > 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。 > > > > Rui Li <[hidden email]> 于2020年8月17日周一 下午3:46写道: > > > > > 可能是打fat > > > jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现 > > > > > > On Fri, Aug 14, 2020 at 7:13 PM 赵一旦 <[hidden email]> wrote: > > > > > > > 代码如下: > > > > // tEnv; > > > > tEnv.sqlUpdate("create table dr1( " + > > > > " cid STRING, " + > > > > " server_time BIGINT, " + > > > > " d MAP<STRING, STRING>, " + > > > > " process_time AS PROCTIME(), " + > > > > " event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / > > 1000)), > > > > " + > > > > " WATERMARK FOR event_time AS event_time - INTERVAL '60' > > SECOND > > > > " + > > > > ") WITH ( " + > > > > " 'update-mode' = 'append', " + > > > > " 'connector.type' = 'kafka', " + > > > > " 'connector.version' = 'universal', " + > > > > " 'connector.topic' = 'antibot_dr1', " + > > > > " 'connector.startup-mode' = 'latest-offset', " + > > > > " 'connector.properties.zookeeper.connect' = > > > > 'yq01-sw-xxx03.yq01:8681', " + > > > > " 'connector.properties.bootstrap.servers' = > > > > 'yq01-sw-xxx03.yq01:8192', " + > > > > " 'format.type' = 'json' " + > > > > ")"); > > > > Table t1 = tEnv.sqlQuery("select * from dr1"); > > > > > > > > 我打包会把flink-json打包进去,最终结果包是test.jar。 > > > > > > > > test.jar是个fat jar,相关依赖都有了。 > > > > > > > > 然后我执行:flink run -c test.SQLWC1 --detached test.jar 报错: > > > > > > > > Caused by: > org.apache.flink.table.api.NoMatchingTableFactoryException: > > > > Could not find a suitable table factory for > > > > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in > > > > the classpath. > > > > > > > > 可是我flink-json.jar都打包进去了,居然还是报错。。。 > > > > > > > > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached test.jar 这个命令的机器 > > > > > > > > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。 > > > > > > > > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗? > > > > > > > > > > > > > -- > > > Best regards! > > > Rui Li > > > > > > > > -- > Best regards! > Rui Li > |
对,这个文件每一行就是一个类名。另外你说的flink run指定额外的jar包应该可以通过-C参数来指定
On Mon, Aug 17, 2020 at 5:38 PM 赵一旦 <[hidden email]> wrote: > 哦哦。也就是说本身这个文件内内容是支持一行一个这样的格式的是吧? > > Rui Li <[hidden email]> 于2020年8月17日周一 下午5:36写道: > > > 用shade plugin的时候可以指定service resource > > transformer,应该能把多个service文件merge起来。具体可以参考: > > > > > https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer > > > > On Mon, Aug 17, 2020 at 5:00 PM 赵一旦 <[hidden email]> wrote: > > > > > @RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。 > > > > > > 所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子? > > > > > > > > > > > > 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。 > > > > > > Rui Li <[hidden email]> 于2020年8月17日周一 下午3:46写道: > > > > > > > 可能是打fat > > > > jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现 > > > > > > > > On Fri, Aug 14, 2020 at 7:13 PM 赵一旦 <[hidden email]> wrote: > > > > > > > > > 代码如下: > > > > > // tEnv; > > > > > tEnv.sqlUpdate("create table dr1( " + > > > > > " cid STRING, " + > > > > > " server_time BIGINT, " + > > > > > " d MAP<STRING, STRING>, " + > > > > > " process_time AS PROCTIME(), " + > > > > > " event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / > > > 1000)), > > > > > " + > > > > > " WATERMARK FOR event_time AS event_time - INTERVAL '60' > > > SECOND > > > > > " + > > > > > ") WITH ( " + > > > > > " 'update-mode' = 'append', " + > > > > > " 'connector.type' = 'kafka', " + > > > > > " 'connector.version' = 'universal', " + > > > > > " 'connector.topic' = 'antibot_dr1', " + > > > > > " 'connector.startup-mode' = 'latest-offset', " + > > > > > " 'connector.properties.zookeeper.connect' = > > > > > 'yq01-sw-xxx03.yq01:8681', " + > > > > > " 'connector.properties.bootstrap.servers' = > > > > > 'yq01-sw-xxx03.yq01:8192', " + > > > > > " 'format.type' = 'json' " + > > > > > ")"); > > > > > Table t1 = tEnv.sqlQuery("select * from dr1"); > > > > > > > > > > 我打包会把flink-json打包进去,最终结果包是test.jar。 > > > > > > > > > > test.jar是个fat jar,相关依赖都有了。 > > > > > > > > > > 然后我执行:flink run -c test.SQLWC1 --detached test.jar 报错: > > > > > > > > > > Caused by: > > org.apache.flink.table.api.NoMatchingTableFactoryException: > > > > > Could not find a suitable table factory for > > > > > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in > > > > > the classpath. > > > > > > > > > > 可是我flink-json.jar都打包进去了,居然还是报错。。。 > > > > > > > > > > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached test.jar 这个命令的机器 > > > > > > > > > > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。 > > > > > > > > > > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗? > > > > > > > > > > > > > > > > > -- > > > > Best regards! > > > > Rui Li > > > > > > > > > > > > > -- > > Best regards! > > Rui Li > > > -- Best regards! Rui Li |
不一样吧。-C 指定的是必须所有集群机器能访问的。sql-client指定的那个按照之前其他同学的说法是会被上传的。
Rui Li <[hidden email]> 于2020年8月17日周一 下午5:40写道: > 对,这个文件每一行就是一个类名。另外你说的flink run指定额外的jar包应该可以通过-C参数来指定 > > On Mon, Aug 17, 2020 at 5:38 PM 赵一旦 <[hidden email]> wrote: > > > 哦哦。也就是说本身这个文件内内容是支持一行一个这样的格式的是吧? > > > > Rui Li <[hidden email]> 于2020年8月17日周一 下午5:36写道: > > > > > 用shade plugin的时候可以指定service resource > > > transformer,应该能把多个service文件merge起来。具体可以参考: > > > > > > > > > https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer > > > > > > On Mon, Aug 17, 2020 at 5:00 PM 赵一旦 <[hidden email]> wrote: > > > > > > > > @RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。 > > > > > > > > 所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子? > > > > > > > > > > > > > > > > > > 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。 > > > > > > > > Rui Li <[hidden email]> 于2020年8月17日周一 下午3:46写道: > > > > > > > > > 可能是打fat > > > > > > jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现 > > > > > > > > > > On Fri, Aug 14, 2020 at 7:13 PM 赵一旦 <[hidden email]> wrote: > > > > > > > > > > > 代码如下: > > > > > > // tEnv; > > > > > > tEnv.sqlUpdate("create table dr1( " + > > > > > > " cid STRING, " + > > > > > > " server_time BIGINT, " + > > > > > > " d MAP<STRING, STRING>, " + > > > > > > " process_time AS PROCTIME(), " + > > > > > > " event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / > > > > 1000)), > > > > > > " + > > > > > > " WATERMARK FOR event_time AS event_time - INTERVAL '60' > > > > SECOND > > > > > > " + > > > > > > ") WITH ( " + > > > > > > " 'update-mode' = 'append', " + > > > > > > " 'connector.type' = 'kafka', " + > > > > > > " 'connector.version' = 'universal', " + > > > > > > " 'connector.topic' = 'antibot_dr1', " + > > > > > > " 'connector.startup-mode' = 'latest-offset', " + > > > > > > " 'connector.properties.zookeeper.connect' = > > > > > > 'yq01-sw-xxx03.yq01:8681', " + > > > > > > " 'connector.properties.bootstrap.servers' = > > > > > > 'yq01-sw-xxx03.yq01:8192', " + > > > > > > " 'format.type' = 'json' " + > > > > > > ")"); > > > > > > Table t1 = tEnv.sqlQuery("select * from dr1"); > > > > > > > > > > > > 我打包会把flink-json打包进去,最终结果包是test.jar。 > > > > > > > > > > > > test.jar是个fat jar,相关依赖都有了。 > > > > > > > > > > > > 然后我执行:flink run -c test.SQLWC1 --detached test.jar 报错: > > > > > > > > > > > > Caused by: > > > org.apache.flink.table.api.NoMatchingTableFactoryException: > > > > > > Could not find a suitable table factory for > > > > > > 'org.apache.flink.table.factories.DeserializationSchemaFactory' > in > > > > > > the classpath. > > > > > > > > > > > > 可是我flink-json.jar都打包进去了,居然还是报错。。。 > > > > > > > > > > > > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached test.jar 这个命令的机器 > > > > > > > > > > > > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。 > > > > > > > > > > > > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗? > > > > > > > > > > > > > > > > > > > > > -- > > > > > Best regards! > > > > > Rui Li > > > > > > > > > > > > > > > > > > -- > > > Best regards! > > > Rui Li > > > > > > > > -- > Best regards! > Rui Li > |
没错,-C是需要每台机器上能访问到的
On Mon, Aug 17, 2020 at 5:56 PM 赵一旦 <[hidden email]> wrote: > 不一样吧。-C 指定的是必须所有集群机器能访问的。sql-client指定的那个按照之前其他同学的说法是会被上传的。 > > Rui Li <[hidden email]> 于2020年8月17日周一 下午5:40写道: > > > 对,这个文件每一行就是一个类名。另外你说的flink run指定额外的jar包应该可以通过-C参数来指定 > > > > On Mon, Aug 17, 2020 at 5:38 PM 赵一旦 <[hidden email]> wrote: > > > > > 哦哦。也就是说本身这个文件内内容是支持一行一个这样的格式的是吧? > > > > > > Rui Li <[hidden email]> 于2020年8月17日周一 下午5:36写道: > > > > > > > 用shade plugin的时候可以指定service resource > > > > transformer,应该能把多个service文件merge起来。具体可以参考: > > > > > > > > > > > > > > https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer > > > > > > > > On Mon, Aug 17, 2020 at 5:00 PM 赵一旦 <[hidden email]> wrote: > > > > > > > > > > > @RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。 > > > > > > > > > > 所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子? > > > > > > > > > > > > > > > > > > > > > > > > > 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。 > > > > > > > > > > Rui Li <[hidden email]> 于2020年8月17日周一 下午3:46写道: > > > > > > > > > > > 可能是打fat > > > > > > > > jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现 > > > > > > > > > > > > On Fri, Aug 14, 2020 at 7:13 PM 赵一旦 <[hidden email]> wrote: > > > > > > > > > > > > > 代码如下: > > > > > > > // tEnv; > > > > > > > tEnv.sqlUpdate("create table dr1( " + > > > > > > > " cid STRING, " + > > > > > > > " server_time BIGINT, " + > > > > > > > " d MAP<STRING, STRING>, " + > > > > > > > " process_time AS PROCTIME(), " + > > > > > > > " event_time AS > TO_TIMESTAMP(FROM_UNIXTIME(server_time / > > > > > 1000)), > > > > > > > " + > > > > > > > " WATERMARK FOR event_time AS event_time - INTERVAL > '60' > > > > > SECOND > > > > > > > " + > > > > > > > ") WITH ( " + > > > > > > > " 'update-mode' = 'append', " + > > > > > > > " 'connector.type' = 'kafka', " + > > > > > > > " 'connector.version' = 'universal', " + > > > > > > > " 'connector.topic' = 'antibot_dr1', " + > > > > > > > " 'connector.startup-mode' = 'latest-offset', " + > > > > > > > " 'connector.properties.zookeeper.connect' = > > > > > > > 'yq01-sw-xxx03.yq01:8681', " + > > > > > > > " 'connector.properties.bootstrap.servers' = > > > > > > > 'yq01-sw-xxx03.yq01:8192', " + > > > > > > > " 'format.type' = 'json' " + > > > > > > > ")"); > > > > > > > Table t1 = tEnv.sqlQuery("select * from dr1"); > > > > > > > > > > > > > > 我打包会把flink-json打包进去,最终结果包是test.jar。 > > > > > > > > > > > > > > test.jar是个fat jar,相关依赖都有了。 > > > > > > > > > > > > > > 然后我执行:flink run -c test.SQLWC1 --detached test.jar 报错: > > > > > > > > > > > > > > Caused by: > > > > org.apache.flink.table.api.NoMatchingTableFactoryException: > > > > > > > Could not find a suitable table factory for > > > > > > > 'org.apache.flink.table.factories.DeserializationSchemaFactory' > > in > > > > > > > the classpath. > > > > > > > > > > > > > > 可是我flink-json.jar都打包进去了,居然还是报错。。。 > > > > > > > > > > > > > > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached test.jar > 这个命令的机器 > > > > > > > > > > > > > > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。 > > > > > > > > > > > > > > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗? > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > Best regards! > > > > > > Rui Li > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Best regards! > > > > Rui Li > > > > > > > > > > > > > -- > > Best regards! > > Rui Li > > > -- Best regards! Rui Li |
Free forum by Nabble | Edit this page |