FlinkSQL tableEnv 依赖问题

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

FlinkSQL tableEnv 依赖问题

nobleyd
代码如下:
// tEnv;
tEnv.sqlUpdate("create table dr1(  " +
        "  cid STRING,  " +
        "  server_time BIGINT,  " +
        "  d MAP<STRING, STRING>,  " +
        "  process_time AS PROCTIME(),  " +
        "  event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / 1000)),  " +
        "  WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND  " +
        ") WITH (  " +
        "  'update-mode' = 'append',  " +
        "  'connector.type' = 'kafka',  " +
        "  'connector.version' = 'universal',  " +
        "  'connector.topic' = 'antibot_dr1',  " +
        "  'connector.startup-mode' = 'latest-offset',  " +
        "  'connector.properties.zookeeper.connect' =
'yq01-sw-xxx03.yq01:8681',  " +
        "  'connector.properties.bootstrap.servers' =
'yq01-sw-xxx03.yq01:8192',  " +
        "  'format.type' = 'json'  " +
        ")");
Table t1 = tEnv.sqlQuery("select * from dr1");

我打包会把flink-json打包进去,最终结果包是test.jar。

test.jar是个fat jar,相关依赖都有了。

然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.

可是我flink-json.jar都打包进去了,居然还是报错。。。

解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar 这个命令的机器

上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。

搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?
Reply | Threaded
Open this post in threaded view
|

Re: FlinkSQL tableEnv 依赖问题

Rui Li
可能是打fat
jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现

On Fri, Aug 14, 2020 at 7:13 PM 赵一旦 <[hidden email]> wrote:

> 代码如下:
> // tEnv;
> tEnv.sqlUpdate("create table dr1(  " +
>         "  cid STRING,  " +
>         "  server_time BIGINT,  " +
>         "  d MAP<STRING, STRING>,  " +
>         "  process_time AS PROCTIME(),  " +
>         "  event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / 1000)),
> " +
>         "  WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND
> " +
>         ") WITH (  " +
>         "  'update-mode' = 'append',  " +
>         "  'connector.type' = 'kafka',  " +
>         "  'connector.version' = 'universal',  " +
>         "  'connector.topic' = 'antibot_dr1',  " +
>         "  'connector.startup-mode' = 'latest-offset',  " +
>         "  'connector.properties.zookeeper.connect' =
> 'yq01-sw-xxx03.yq01:8681',  " +
>         "  'connector.properties.bootstrap.servers' =
> 'yq01-sw-xxx03.yq01:8192',  " +
>         "  'format.type' = 'json'  " +
>         ")");
> Table t1 = tEnv.sqlQuery("select * from dr1");
>
> 我打包会把flink-json打包进去,最终结果包是test.jar。
>
> test.jar是个fat jar,相关依赖都有了。
>
> 然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:
>
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
> the classpath.
>
> 可是我flink-json.jar都打包进去了,居然还是报错。。。
>
> 解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar 这个命令的机器
>
> 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。
>
> 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?
>


--
Best regards!
Rui Li
Reply | Threaded
Open this post in threaded view
|

Re: FlinkSQL tableEnv 依赖问题

nobleyd
@RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。

所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子?

此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。

Rui Li <[hidden email]> 于2020年8月17日周一 下午3:46写道:

> 可能是打fat
> jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现
>
> On Fri, Aug 14, 2020 at 7:13 PM 赵一旦 <[hidden email]> wrote:
>
> > 代码如下:
> > // tEnv;
> > tEnv.sqlUpdate("create table dr1(  " +
> >         "  cid STRING,  " +
> >         "  server_time BIGINT,  " +
> >         "  d MAP<STRING, STRING>,  " +
> >         "  process_time AS PROCTIME(),  " +
> >         "  event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / 1000)),
> > " +
> >         "  WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND
> > " +
> >         ") WITH (  " +
> >         "  'update-mode' = 'append',  " +
> >         "  'connector.type' = 'kafka',  " +
> >         "  'connector.version' = 'universal',  " +
> >         "  'connector.topic' = 'antibot_dr1',  " +
> >         "  'connector.startup-mode' = 'latest-offset',  " +
> >         "  'connector.properties.zookeeper.connect' =
> > 'yq01-sw-xxx03.yq01:8681',  " +
> >         "  'connector.properties.bootstrap.servers' =
> > 'yq01-sw-xxx03.yq01:8192',  " +
> >         "  'format.type' = 'json'  " +
> >         ")");
> > Table t1 = tEnv.sqlQuery("select * from dr1");
> >
> > 我打包会把flink-json打包进去,最终结果包是test.jar。
> >
> > test.jar是个fat jar,相关依赖都有了。
> >
> > 然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:
> >
> > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> > Could not find a suitable table factory for
> > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
> > the classpath.
> >
> > 可是我flink-json.jar都打包进去了,居然还是报错。。。
> >
> > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar 这个命令的机器
> >
> > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。
> >
> > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?
> >
>
>
> --
> Best regards!
> Rui Li
>
Reply | Threaded
Open this post in threaded view
|

Re: FlinkSQL tableEnv 依赖问题

nobleyd
小伙伴们,帮忙看下怎么解决呢? 通过写代码方式,基于table
api执行flink的sql。这种情况下用到的flink-json等包通过shade等方式做成一个大jar包之后依赖的问题。

赵一旦 <[hidden email]> 于2020年8月17日周一 下午5:00写道:

> @RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。
>
> 所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子?
>
>
> 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。
>
> Rui Li <[hidden email]> 于2020年8月17日周一 下午3:46写道:
>
>> 可能是打fat
>> jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现
>>
>> On Fri, Aug 14, 2020 at 7:13 PM 赵一旦 <[hidden email]> wrote:
>>
>> > 代码如下:
>> > // tEnv;
>> > tEnv.sqlUpdate("create table dr1(  " +
>> >         "  cid STRING,  " +
>> >         "  server_time BIGINT,  " +
>> >         "  d MAP<STRING, STRING>,  " +
>> >         "  process_time AS PROCTIME(),  " +
>> >         "  event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time /
>> 1000)),
>> > " +
>> >         "  WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND
>> > " +
>> >         ") WITH (  " +
>> >         "  'update-mode' = 'append',  " +
>> >         "  'connector.type' = 'kafka',  " +
>> >         "  'connector.version' = 'universal',  " +
>> >         "  'connector.topic' = 'antibot_dr1',  " +
>> >         "  'connector.startup-mode' = 'latest-offset',  " +
>> >         "  'connector.properties.zookeeper.connect' =
>> > 'yq01-sw-xxx03.yq01:8681',  " +
>> >         "  'connector.properties.bootstrap.servers' =
>> > 'yq01-sw-xxx03.yq01:8192',  " +
>> >         "  'format.type' = 'json'  " +
>> >         ")");
>> > Table t1 = tEnv.sqlQuery("select * from dr1");
>> >
>> > 我打包会把flink-json打包进去,最终结果包是test.jar。
>> >
>> > test.jar是个fat jar,相关依赖都有了。
>> >
>> > 然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:
>> >
>> > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
>> > Could not find a suitable table factory for
>> > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
>> > the classpath.
>> >
>> > 可是我flink-json.jar都打包进去了,居然还是报错。。。
>> >
>> > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar 这个命令的机器
>> >
>> > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。
>> >
>> > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?
>> >
>>
>>
>> --
>> Best regards!
>> Rui Li
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: FlinkSQL tableEnv 依赖问题

Rui Li
In reply to this post by nobleyd
用shade plugin的时候可以指定service resource
transformer,应该能把多个service文件merge起来。具体可以参考:
https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer

On Mon, Aug 17, 2020 at 5:00 PM 赵一旦 <[hidden email]> wrote:

> @RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。
>
> 所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子?
>
>
> 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。
>
> Rui Li <[hidden email]> 于2020年8月17日周一 下午3:46写道:
>
> > 可能是打fat
> > jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现
> >
> > On Fri, Aug 14, 2020 at 7:13 PM 赵一旦 <[hidden email]> wrote:
> >
> > > 代码如下:
> > > // tEnv;
> > > tEnv.sqlUpdate("create table dr1(  " +
> > >         "  cid STRING,  " +
> > >         "  server_time BIGINT,  " +
> > >         "  d MAP<STRING, STRING>,  " +
> > >         "  process_time AS PROCTIME(),  " +
> > >         "  event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time /
> 1000)),
> > > " +
> > >         "  WATERMARK FOR event_time AS event_time - INTERVAL '60'
> SECOND
> > > " +
> > >         ") WITH (  " +
> > >         "  'update-mode' = 'append',  " +
> > >         "  'connector.type' = 'kafka',  " +
> > >         "  'connector.version' = 'universal',  " +
> > >         "  'connector.topic' = 'antibot_dr1',  " +
> > >         "  'connector.startup-mode' = 'latest-offset',  " +
> > >         "  'connector.properties.zookeeper.connect' =
> > > 'yq01-sw-xxx03.yq01:8681',  " +
> > >         "  'connector.properties.bootstrap.servers' =
> > > 'yq01-sw-xxx03.yq01:8192',  " +
> > >         "  'format.type' = 'json'  " +
> > >         ")");
> > > Table t1 = tEnv.sqlQuery("select * from dr1");
> > >
> > > 我打包会把flink-json打包进去,最终结果包是test.jar。
> > >
> > > test.jar是个fat jar,相关依赖都有了。
> > >
> > > 然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:
> > >
> > > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> > > Could not find a suitable table factory for
> > > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
> > > the classpath.
> > >
> > > 可是我flink-json.jar都打包进去了,居然还是报错。。。
> > >
> > > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar 这个命令的机器
> > >
> > > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。
> > >
> > > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?
> > >
> >
> >
> > --
> > Best regards!
> > Rui Li
> >
>


--
Best regards!
Rui Li
Reply | Threaded
Open this post in threaded view
|

Re: FlinkSQL tableEnv 依赖问题

nobleyd
In reply to this post by nobleyd
或者通过 flink run 方式运行的任务,能否像 sql-client.sh 那样通过-l,-j指定的jar也会被上传到集群呢?

赵一旦 <[hidden email]> 于2020年8月17日周一 下午5:34写道:

> 小伙伴们,帮忙看下怎么解决呢? 通过写代码方式,基于table
> api执行flink的sql。这种情况下用到的flink-json等包通过shade等方式做成一个大jar包之后依赖的问题。
>
> 赵一旦 <[hidden email]> 于2020年8月17日周一 下午5:00写道:
>
>> @RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。
>>
>> 所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子?
>>
>>
>> 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。
>>
>> Rui Li <[hidden email]> 于2020年8月17日周一 下午3:46写道:
>>
>>> 可能是打fat
>>> jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现
>>>
>>> On Fri, Aug 14, 2020 at 7:13 PM 赵一旦 <[hidden email]> wrote:
>>>
>>> > 代码如下:
>>> > // tEnv;
>>> > tEnv.sqlUpdate("create table dr1(  " +
>>> >         "  cid STRING,  " +
>>> >         "  server_time BIGINT,  " +
>>> >         "  d MAP<STRING, STRING>,  " +
>>> >         "  process_time AS PROCTIME(),  " +
>>> >         "  event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time /
>>> 1000)),
>>> > " +
>>> >         "  WATERMARK FOR event_time AS event_time - INTERVAL '60'
>>> SECOND
>>> > " +
>>> >         ") WITH (  " +
>>> >         "  'update-mode' = 'append',  " +
>>> >         "  'connector.type' = 'kafka',  " +
>>> >         "  'connector.version' = 'universal',  " +
>>> >         "  'connector.topic' = 'antibot_dr1',  " +
>>> >         "  'connector.startup-mode' = 'latest-offset',  " +
>>> >         "  'connector.properties.zookeeper.connect' =
>>> > 'yq01-sw-xxx03.yq01:8681',  " +
>>> >         "  'connector.properties.bootstrap.servers' =
>>> > 'yq01-sw-xxx03.yq01:8192',  " +
>>> >         "  'format.type' = 'json'  " +
>>> >         ")");
>>> > Table t1 = tEnv.sqlQuery("select * from dr1");
>>> >
>>> > 我打包会把flink-json打包进去,最终结果包是test.jar。
>>> >
>>> > test.jar是个fat jar,相关依赖都有了。
>>> >
>>> > 然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:
>>> >
>>> > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
>>> > Could not find a suitable table factory for
>>> > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
>>> > the classpath.
>>> >
>>> > 可是我flink-json.jar都打包进去了,居然还是报错。。。
>>> >
>>> > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar 这个命令的机器
>>> >
>>> > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。
>>> >
>>> > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?
>>> >
>>>
>>>
>>> --
>>> Best regards!
>>> Rui Li
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: FlinkSQL tableEnv 依赖问题

nobleyd
In reply to this post by Rui Li
哦哦。也就是说本身这个文件内内容是支持一行一个这样的格式的是吧?

Rui Li <[hidden email]> 于2020年8月17日周一 下午5:36写道:

> 用shade plugin的时候可以指定service resource
> transformer,应该能把多个service文件merge起来。具体可以参考:
>
> https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer
>
> On Mon, Aug 17, 2020 at 5:00 PM 赵一旦 <[hidden email]> wrote:
>
> > @RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。
> >
> > 所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子?
> >
> >
> >
> 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。
> >
> > Rui Li <[hidden email]> 于2020年8月17日周一 下午3:46写道:
> >
> > > 可能是打fat
> > > jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现
> > >
> > > On Fri, Aug 14, 2020 at 7:13 PM 赵一旦 <[hidden email]> wrote:
> > >
> > > > 代码如下:
> > > > // tEnv;
> > > > tEnv.sqlUpdate("create table dr1(  " +
> > > >         "  cid STRING,  " +
> > > >         "  server_time BIGINT,  " +
> > > >         "  d MAP<STRING, STRING>,  " +
> > > >         "  process_time AS PROCTIME(),  " +
> > > >         "  event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time /
> > 1000)),
> > > > " +
> > > >         "  WATERMARK FOR event_time AS event_time - INTERVAL '60'
> > SECOND
> > > > " +
> > > >         ") WITH (  " +
> > > >         "  'update-mode' = 'append',  " +
> > > >         "  'connector.type' = 'kafka',  " +
> > > >         "  'connector.version' = 'universal',  " +
> > > >         "  'connector.topic' = 'antibot_dr1',  " +
> > > >         "  'connector.startup-mode' = 'latest-offset',  " +
> > > >         "  'connector.properties.zookeeper.connect' =
> > > > 'yq01-sw-xxx03.yq01:8681',  " +
> > > >         "  'connector.properties.bootstrap.servers' =
> > > > 'yq01-sw-xxx03.yq01:8192',  " +
> > > >         "  'format.type' = 'json'  " +
> > > >         ")");
> > > > Table t1 = tEnv.sqlQuery("select * from dr1");
> > > >
> > > > 我打包会把flink-json打包进去,最终结果包是test.jar。
> > > >
> > > > test.jar是个fat jar,相关依赖都有了。
> > > >
> > > > 然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:
> > > >
> > > > Caused by:
> org.apache.flink.table.api.NoMatchingTableFactoryException:
> > > > Could not find a suitable table factory for
> > > > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
> > > > the classpath.
> > > >
> > > > 可是我flink-json.jar都打包进去了,居然还是报错。。。
> > > >
> > > > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar 这个命令的机器
> > > >
> > > > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。
> > > >
> > > > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?
> > > >
> > >
> > >
> > > --
> > > Best regards!
> > > Rui Li
> > >
> >
>
>
> --
> Best regards!
> Rui Li
>
Reply | Threaded
Open this post in threaded view
|

Re: FlinkSQL tableEnv 依赖问题

Rui Li
对,这个文件每一行就是一个类名。另外你说的flink run指定额外的jar包应该可以通过-C参数来指定

On Mon, Aug 17, 2020 at 5:38 PM 赵一旦 <[hidden email]> wrote:

> 哦哦。也就是说本身这个文件内内容是支持一行一个这样的格式的是吧?
>
> Rui Li <[hidden email]> 于2020年8月17日周一 下午5:36写道:
>
> > 用shade plugin的时候可以指定service resource
> > transformer,应该能把多个service文件merge起来。具体可以参考:
> >
> >
> https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer
> >
> > On Mon, Aug 17, 2020 at 5:00 PM 赵一旦 <[hidden email]> wrote:
> >
> > > @RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。
> > >
> > > 所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子?
> > >
> > >
> > >
> >
> 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。
> > >
> > > Rui Li <[hidden email]> 于2020年8月17日周一 下午3:46写道:
> > >
> > > > 可能是打fat
> > > > jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现
> > > >
> > > > On Fri, Aug 14, 2020 at 7:13 PM 赵一旦 <[hidden email]> wrote:
> > > >
> > > > > 代码如下:
> > > > > // tEnv;
> > > > > tEnv.sqlUpdate("create table dr1(  " +
> > > > >         "  cid STRING,  " +
> > > > >         "  server_time BIGINT,  " +
> > > > >         "  d MAP<STRING, STRING>,  " +
> > > > >         "  process_time AS PROCTIME(),  " +
> > > > >         "  event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time /
> > > 1000)),
> > > > > " +
> > > > >         "  WATERMARK FOR event_time AS event_time - INTERVAL '60'
> > > SECOND
> > > > > " +
> > > > >         ") WITH (  " +
> > > > >         "  'update-mode' = 'append',  " +
> > > > >         "  'connector.type' = 'kafka',  " +
> > > > >         "  'connector.version' = 'universal',  " +
> > > > >         "  'connector.topic' = 'antibot_dr1',  " +
> > > > >         "  'connector.startup-mode' = 'latest-offset',  " +
> > > > >         "  'connector.properties.zookeeper.connect' =
> > > > > 'yq01-sw-xxx03.yq01:8681',  " +
> > > > >         "  'connector.properties.bootstrap.servers' =
> > > > > 'yq01-sw-xxx03.yq01:8192',  " +
> > > > >         "  'format.type' = 'json'  " +
> > > > >         ")");
> > > > > Table t1 = tEnv.sqlQuery("select * from dr1");
> > > > >
> > > > > 我打包会把flink-json打包进去,最终结果包是test.jar。
> > > > >
> > > > > test.jar是个fat jar,相关依赖都有了。
> > > > >
> > > > > 然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:
> > > > >
> > > > > Caused by:
> > org.apache.flink.table.api.NoMatchingTableFactoryException:
> > > > > Could not find a suitable table factory for
> > > > > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
> > > > > the classpath.
> > > > >
> > > > > 可是我flink-json.jar都打包进去了,居然还是报错。。。
> > > > >
> > > > > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar 这个命令的机器
> > > > >
> > > > > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。
> > > > >
> > > > > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?
> > > > >
> > > >
> > > >
> > > > --
> > > > Best regards!
> > > > Rui Li
> > > >
> > >
> >
> >
> > --
> > Best regards!
> > Rui Li
> >
>


--
Best regards!
Rui Li
Reply | Threaded
Open this post in threaded view
|

Re: FlinkSQL tableEnv 依赖问题

nobleyd
不一样吧。-C 指定的是必须所有集群机器能访问的。sql-client指定的那个按照之前其他同学的说法是会被上传的。

Rui Li <[hidden email]> 于2020年8月17日周一 下午5:40写道:

> 对,这个文件每一行就是一个类名。另外你说的flink run指定额外的jar包应该可以通过-C参数来指定
>
> On Mon, Aug 17, 2020 at 5:38 PM 赵一旦 <[hidden email]> wrote:
>
> > 哦哦。也就是说本身这个文件内内容是支持一行一个这样的格式的是吧?
> >
> > Rui Li <[hidden email]> 于2020年8月17日周一 下午5:36写道:
> >
> > > 用shade plugin的时候可以指定service resource
> > > transformer,应该能把多个service文件merge起来。具体可以参考:
> > >
> > >
> >
> https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer
> > >
> > > On Mon, Aug 17, 2020 at 5:00 PM 赵一旦 <[hidden email]> wrote:
> > >
> > > >
> @RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。
> > > >
> > > > 所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子?
> > > >
> > > >
> > > >
> > >
> >
> 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。
> > > >
> > > > Rui Li <[hidden email]> 于2020年8月17日周一 下午3:46写道:
> > > >
> > > > > 可能是打fat
> > > > >
> jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现
> > > > >
> > > > > On Fri, Aug 14, 2020 at 7:13 PM 赵一旦 <[hidden email]> wrote:
> > > > >
> > > > > > 代码如下:
> > > > > > // tEnv;
> > > > > > tEnv.sqlUpdate("create table dr1(  " +
> > > > > >         "  cid STRING,  " +
> > > > > >         "  server_time BIGINT,  " +
> > > > > >         "  d MAP<STRING, STRING>,  " +
> > > > > >         "  process_time AS PROCTIME(),  " +
> > > > > >         "  event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time /
> > > > 1000)),
> > > > > > " +
> > > > > >         "  WATERMARK FOR event_time AS event_time - INTERVAL '60'
> > > > SECOND
> > > > > > " +
> > > > > >         ") WITH (  " +
> > > > > >         "  'update-mode' = 'append',  " +
> > > > > >         "  'connector.type' = 'kafka',  " +
> > > > > >         "  'connector.version' = 'universal',  " +
> > > > > >         "  'connector.topic' = 'antibot_dr1',  " +
> > > > > >         "  'connector.startup-mode' = 'latest-offset',  " +
> > > > > >         "  'connector.properties.zookeeper.connect' =
> > > > > > 'yq01-sw-xxx03.yq01:8681',  " +
> > > > > >         "  'connector.properties.bootstrap.servers' =
> > > > > > 'yq01-sw-xxx03.yq01:8192',  " +
> > > > > >         "  'format.type' = 'json'  " +
> > > > > >         ")");
> > > > > > Table t1 = tEnv.sqlQuery("select * from dr1");
> > > > > >
> > > > > > 我打包会把flink-json打包进去,最终结果包是test.jar。
> > > > > >
> > > > > > test.jar是个fat jar,相关依赖都有了。
> > > > > >
> > > > > > 然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:
> > > > > >
> > > > > > Caused by:
> > > org.apache.flink.table.api.NoMatchingTableFactoryException:
> > > > > > Could not find a suitable table factory for
> > > > > > 'org.apache.flink.table.factories.DeserializationSchemaFactory'
> in
> > > > > > the classpath.
> > > > > >
> > > > > > 可是我flink-json.jar都打包进去了,居然还是报错。。。
> > > > > >
> > > > > > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar 这个命令的机器
> > > > > >
> > > > > > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。
> > > > > >
> > > > > > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Best regards!
> > > > > Rui Li
> > > > >
> > > >
> > >
> > >
> > > --
> > > Best regards!
> > > Rui Li
> > >
> >
>
>
> --
> Best regards!
> Rui Li
>
Reply | Threaded
Open this post in threaded view
|

Re: FlinkSQL tableEnv 依赖问题

Rui Li
没错,-C是需要每台机器上能访问到的

On Mon, Aug 17, 2020 at 5:56 PM 赵一旦 <[hidden email]> wrote:

> 不一样吧。-C 指定的是必须所有集群机器能访问的。sql-client指定的那个按照之前其他同学的说法是会被上传的。
>
> Rui Li <[hidden email]> 于2020年8月17日周一 下午5:40写道:
>
> > 对,这个文件每一行就是一个类名。另外你说的flink run指定额外的jar包应该可以通过-C参数来指定
> >
> > On Mon, Aug 17, 2020 at 5:38 PM 赵一旦 <[hidden email]> wrote:
> >
> > > 哦哦。也就是说本身这个文件内内容是支持一行一个这样的格式的是吧?
> > >
> > > Rui Li <[hidden email]> 于2020年8月17日周一 下午5:36写道:
> > >
> > > > 用shade plugin的时候可以指定service resource
> > > > transformer,应该能把多个service文件merge起来。具体可以参考:
> > > >
> > > >
> > >
> >
> https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer
> > > >
> > > > On Mon, Aug 17, 2020 at 5:00 PM 赵一旦 <[hidden email]> wrote:
> > > >
> > > > >
> > @RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。
> > > > >
> > > > > 所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子?
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。
> > > > >
> > > > > Rui Li <[hidden email]> 于2020年8月17日周一 下午3:46写道:
> > > > >
> > > > > > 可能是打fat
> > > > > >
> > jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现
> > > > > >
> > > > > > On Fri, Aug 14, 2020 at 7:13 PM 赵一旦 <[hidden email]> wrote:
> > > > > >
> > > > > > > 代码如下:
> > > > > > > // tEnv;
> > > > > > > tEnv.sqlUpdate("create table dr1(  " +
> > > > > > >         "  cid STRING,  " +
> > > > > > >         "  server_time BIGINT,  " +
> > > > > > >         "  d MAP<STRING, STRING>,  " +
> > > > > > >         "  process_time AS PROCTIME(),  " +
> > > > > > >         "  event_time AS
> TO_TIMESTAMP(FROM_UNIXTIME(server_time /
> > > > > 1000)),
> > > > > > > " +
> > > > > > >         "  WATERMARK FOR event_time AS event_time - INTERVAL
> '60'
> > > > > SECOND
> > > > > > > " +
> > > > > > >         ") WITH (  " +
> > > > > > >         "  'update-mode' = 'append',  " +
> > > > > > >         "  'connector.type' = 'kafka',  " +
> > > > > > >         "  'connector.version' = 'universal',  " +
> > > > > > >         "  'connector.topic' = 'antibot_dr1',  " +
> > > > > > >         "  'connector.startup-mode' = 'latest-offset',  " +
> > > > > > >         "  'connector.properties.zookeeper.connect' =
> > > > > > > 'yq01-sw-xxx03.yq01:8681',  " +
> > > > > > >         "  'connector.properties.bootstrap.servers' =
> > > > > > > 'yq01-sw-xxx03.yq01:8192',  " +
> > > > > > >         "  'format.type' = 'json'  " +
> > > > > > >         ")");
> > > > > > > Table t1 = tEnv.sqlQuery("select * from dr1");
> > > > > > >
> > > > > > > 我打包会把flink-json打包进去,最终结果包是test.jar。
> > > > > > >
> > > > > > > test.jar是个fat jar,相关依赖都有了。
> > > > > > >
> > > > > > > 然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:
> > > > > > >
> > > > > > > Caused by:
> > > > org.apache.flink.table.api.NoMatchingTableFactoryException:
> > > > > > > Could not find a suitable table factory for
> > > > > > > 'org.apache.flink.table.factories.DeserializationSchemaFactory'
> > in
> > > > > > > the classpath.
> > > > > > >
> > > > > > > 可是我flink-json.jar都打包进去了,居然还是报错。。。
> > > > > > >
> > > > > > > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar
> 这个命令的机器
> > > > > > >
> > > > > > > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。
> > > > > > >
> > > > > > > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Best regards!
> > > > > > Rui Li
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Best regards!
> > > > Rui Li
> > > >
> > >
> >
> >
> > --
> > Best regards!
> > Rui Li
> >
>


--
Best regards!
Rui Li