FlinkSQL命令行方式提交sql任务的jar依赖问题

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

FlinkSQL命令行方式提交sql任务的jar依赖问题

Zhao,Yi(SEC)
背景,我的flink集群(10机器)A,此外还有一个单独的flink机器(作为提交任务的机器)B。
现在比较混乱,哪些jar需要放到A,哪些放到B。


(1)     kafka ssl 证书,这个肯定需要放到A,因为kafka提供的属性只是ssl证书的路径,真正读取证书是任务开始运行之后,因此这个路径必须是集群A可访问,B是否可访问其实无所谓。

(2)     flink-sql情况下的kafak-client包。这个比较奇怪,从我的实验效果来看,貌似机器B上没有会报错,必须在启动sql-client的时候通过-j或-l参数指定jar路径,并包含kafka-client.jar,类似的还有flink-json等格式包。此外,这些包在集群A上都没有,但运行都正常。这么来看,我在sql-client中提交一个sql的时候,部分依赖应该是被打包并且上传到集群A的?这是怎么决定的呢?从我的sql中动态判定哪些包用到?还是咋的。



总结下,FlinkSQL使用sql-client命令行方式提交sql执行任务的场景下。最终运行时候哪些包需要用到sql-client端的包呢?

目前从实验来看Flink-json、各种connector的包都是用的sql-client的包。


Reply | Threaded
Open this post in threaded view
|

Re: FlinkSQL命令行方式提交sql任务的jar依赖问题

Jeff Zhang
你的10台机器是flink standalone还是 yarn集群 ?
如果是yarn集群,那么只要把依赖放到机器A就可以了,如果是standalone的话,就要在B集群上部署了依赖了。

另外你可以用zeppelin来提交flink sql 作业,zeppelin提供很多种添加第三方依赖的方法,具体可以参考文档
https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s
或者加入钉钉群讨论,钉钉群号: 32803524


Zhao,Yi(SEC) <[hidden email]> 于2020年8月13日周四 下午1:02写道:

> 背景,我的flink集群(10机器)A,此外还有一个单独的flink机器(作为提交任务的机器)B。
> 现在比较混乱,哪些jar需要放到A,哪些放到B。
>
>
> (1)     kafka ssl
> 证书,这个肯定需要放到A,因为kafka提供的属性只是ssl证书的路径,真正读取证书是任务开始运行之后,因此这个路径必须是集群A可访问,B是否可访问其实无所谓。
>
> (2)
>  flink-sql情况下的kafak-client包。这个比较奇怪,从我的实验效果来看,貌似机器B上没有会报错,必须在启动sql-client的时候通过-j或-l参数指定jar路径,并包含kafka-client.jar,类似的还有flink-json等格式包。此外,这些包在集群A上都没有,但运行都正常。这么来看,我在sql-client中提交一个sql的时候,部分依赖应该是被打包并且上传到集群A的?这是怎么决定的呢?从我的sql中动态判定哪些包用到?还是咋的。
>
>
>
> 总结下,FlinkSQL使用sql-client命令行方式提交sql执行任务的场景下。最终运行时候哪些包需要用到sql-client端的包呢?
>
> 目前从实验来看Flink-json、各种connector的包都是用的sql-client的包。
>
>
>

--
Best Regards

Jeff Zhang
Reply | Threaded
Open this post in threaded view
|

Re: FlinkSQL命令行方式提交sql任务的jar依赖问题

Zhao,Yi(SEC)
A是10机器集群(HA模式,独立集群),B作为提交机器。
从我实验效果来看,我是先启动一个sql-client的cli,如下命令:
./bin/sql-client.sh embedded -l $(pwd)/libs_sql -l $(pwd)/libs_udf
其中libs_sql中有:flink-connector-kafka_2.12-1.10.0.jar  flink-connector-kafka-base_2.12-1.10.0.jar  flink-jdbc_2.12-1.10.0.jar  flink-json-1.10.0.jar。然后A集群所有机器没有加这些包(Flink部署目录lib中没有)。A集群上其他任务提交的包中jar应该不致于会影响到我sql提交的任务。
结论是,我libs_sql中没有flink-json、flink-connector-kafka等的时候,提交sql任务会报错。加了的时候,提交sql任务不报错。
所以感觉貌似提交sql任务会将启动sql-client时候指定的lib相关包都上传吗?
————————————————————————————————————————————————————————

在 2020/8/13 下午3:10,“Jeff Zhang”<[hidden email]> 写入:

    你的10台机器是flink standalone还是 yarn集群 ?
    如果是yarn集群,那么只要把依赖放到机器A就可以了,如果是standalone的话,就要在B集群上部署了依赖了。
   
    另外你可以用zeppelin来提交flink sql 作业,zeppelin提供很多种添加第三方依赖的方法,具体可以参考文档
    https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s
    或者加入钉钉群讨论,钉钉群号: 32803524
   
   
    Zhao,Yi(SEC) <[hidden email]> 于2020年8月13日周四 下午1:02写道:
   
    > 背景,我的flink集群(10机器)A,此外还有一个单独的flink机器(作为提交任务的机器)B。
    > 现在比较混乱,哪些jar需要放到A,哪些放到B。
    >
    >
    > (1)     kafka ssl
    > 证书,这个肯定需要放到A,因为kafka提供的属性只是ssl证书的路径,真正读取证书是任务开始运行之后,因此这个路径必须是集群A可访问,B是否可访问其实无所谓。
    >
    > (2)
    >  flink-sql情况下的kafak-client包。这个比较奇怪,从我的实验效果来看,貌似机器B上没有会报错,必须在启动sql-client的时候通过-j或-l参数指定jar路径,并包含kafka-client.jar,类似的还有flink-json等格式包。此外,这些包在集群A上都没有,但运行都正常。这么来看,我在sql-client中提交一个sql的时候,部分依赖应该是被打包并且上传到集群A的?这是怎么决定的呢?从我的sql中动态判定哪些包用到?还是咋的。
    >
    >
    >
    > 总结下,FlinkSQL使用sql-client命令行方式提交sql执行任务的场景下。最终运行时候哪些包需要用到sql-client端的包呢?
    >
    > 目前从实验来看Flink-json、各种connector的包都是用的sql-client的包。
    >
    >
    >
   
    --
    Best Regards
   
    Jeff Zhang
   

Reply | Threaded
Open this post in threaded view
|

Re: FlinkSQL命令行方式提交sql任务的jar依赖问题

godfrey he
sql client 中通过 -j 或者 -l 的指定的包会被随着job提交的时候一起上传到jm。

Zhao,Yi(SEC) <[hidden email]> 于2020年8月13日周四 下午5:11写道:

> A是10机器集群(HA模式,独立集群),B作为提交机器。
> 从我实验效果来看,我是先启动一个sql-client的cli,如下命令:
> ./bin/sql-client.sh embedded -l $(pwd)/libs_sql -l $(pwd)/libs_udf
> 其中libs_sql中有:flink-connector-kafka_2.12-1.10.0.jar
> flink-connector-kafka-base_2.12-1.10.0.jar  flink-jdbc_2.12-1.10.0.jar
> flink-json-1.10.0.jar。然后A集群所有机器没有加这些包(Flink部署目录lib中没有)。A集群上其他任务提交的包中jar应该不致于会影响到我sql提交的任务。
>
> 结论是,我libs_sql中没有flink-json、flink-connector-kafka等的时候,提交sql任务会报错。加了的时候,提交sql任务不报错。
> 所以感觉貌似提交sql任务会将启动sql-client时候指定的lib相关包都上传吗?
> ————————————————————————————————————————————————————————
>
> 在 2020/8/13 下午3:10,“Jeff Zhang”<[hidden email]> 写入:
>
>     你的10台机器是flink standalone还是 yarn集群 ?
>     如果是yarn集群,那么只要把依赖放到机器A就可以了,如果是standalone的话,就要在B集群上部署了依赖了。
>
>     另外你可以用zeppelin来提交flink sql 作业,zeppelin提供很多种添加第三方依赖的方法,具体可以参考文档
>     https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s
>     或者加入钉钉群讨论,钉钉群号: 32803524
>
>
>     Zhao,Yi(SEC) <[hidden email]> 于2020年8月13日周四 下午1:02写道:
>
>     > 背景,我的flink集群(10机器)A,此外还有一个单独的flink机器(作为提交任务的机器)B。
>     > 现在比较混乱,哪些jar需要放到A,哪些放到B。
>     >
>     >
>     > (1)     kafka ssl
>     >
> 证书,这个肯定需要放到A,因为kafka提供的属性只是ssl证书的路径,真正读取证书是任务开始运行之后,因此这个路径必须是集群A可访问,B是否可访问其实无所谓。
>     >
>     > (2)
>     >
> flink-sql情况下的kafak-client包。这个比较奇怪,从我的实验效果来看,貌似机器B上没有会报错,必须在启动sql-client的时候通过-j或-l参数指定jar路径,并包含kafka-client.jar,类似的还有flink-json等格式包。此外,这些包在集群A上都没有,但运行都正常。这么来看,我在sql-client中提交一个sql的时候,部分依赖应该是被打包并且上传到集群A的?这是怎么决定的呢?从我的sql中动态判定哪些包用到?还是咋的。
>     >
>     >
>     >
>     >
> 总结下,FlinkSQL使用sql-client命令行方式提交sql执行任务的场景下。最终运行时候哪些包需要用到sql-client端的包呢?
>     >
>     > 目前从实验来看Flink-json、各种connector的包都是用的sql-client的包。
>     >
>     >
>     >
>
>     --
>     Best Regards
>
>     Jeff Zhang
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: FlinkSQL命令行方式提交sql任务的jar依赖问题

Zhao,Yi(SEC)
分析个报错,报错如下:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvAppendTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'
'format.type' expects 'csv', but is 'json'

The following properties are requested:
从报错来看,是需要json的format,但实际只有csv。因为缺少json的format。
这个实验,是我将相关所有jar都放到集群的flink的lib目录并重启了集群。
但是提交sql(即执行sql-client.sh命令)的机器上没有这些依赖,报错如上。

所以,这个根据我的表定义去找对应的format,以及connector等的过程是在提交端做的吗?
还有一个更奇怪的,就算format,connector相关是提交端做的,但是我kafka的ssl证书路径的读取理论上肯定应该是在任务执行时候才会做,但当我执行select * from xxx提交sql之后马上报错了,报错为:
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArrayDeserializer
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
Kafka的序列化类理论上是作为kafkasource被创建时候的properties传入,然后kafkaConsumer执行期间才会发现这个class不存在吧。

___________________________________________________________________________________________________________
在 2020/8/14 上午9:44,“godfrey he”<[hidden email]> 写入:

    sql client 中通过 -j 或者 -l 的指定的包会被随着job提交的时候一起上传到jm。
   
    Zhao,Yi(SEC) <[hidden email]> 于2020年8月13日周四 下午5:11写道:
   
    > A是10机器集群(HA模式,独立集群),B作为提交机器。
    > 从我实验效果来看,我是先启动一个sql-client的cli,如下命令:
    > ./bin/sql-client.sh embedded -l $(pwd)/libs_sql -l $(pwd)/libs_udf
    > 其中libs_sql中有:flink-connector-kafka_2.12-1.10.0.jar
    > flink-connector-kafka-base_2.12-1.10.0.jar  flink-jdbc_2.12-1.10.0.jar
    > flink-json-1.10.0.jar。然后A集群所有机器没有加这些包(Flink部署目录lib中没有)。A集群上其他任务提交的包中jar应该不致于会影响到我sql提交的任务。
    >
    > 结论是,我libs_sql中没有flink-json、flink-connector-kafka等的时候,提交sql任务会报错。加了的时候,提交sql任务不报错。
    > 所以感觉貌似提交sql任务会将启动sql-client时候指定的lib相关包都上传吗?
    > ————————————————————————————————————————————————————————
    >
    > 在 2020/8/13 下午3:10,“Jeff Zhang”<[hidden email]> 写入:
    >
    >     你的10台机器是flink standalone还是 yarn集群 ?
    >     如果是yarn集群,那么只要把依赖放到机器A就可以了,如果是standalone的话,就要在B集群上部署了依赖了。
    >
    >     另外你可以用zeppelin来提交flink sql 作业,zeppelin提供很多种添加第三方依赖的方法,具体可以参考文档
    >     https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s
    >     或者加入钉钉群讨论,钉钉群号: 32803524
    >
    >
    >     Zhao,Yi(SEC) <[hidden email]> 于2020年8月13日周四 下午1:02写道:
    >
    >     > 背景,我的flink集群(10机器)A,此外还有一个单独的flink机器(作为提交任务的机器)B。
    >     > 现在比较混乱,哪些jar需要放到A,哪些放到B。
    >     >
    >     >
    >     > (1)     kafka ssl
    >     >
    > 证书,这个肯定需要放到A,因为kafka提供的属性只是ssl证书的路径,真正读取证书是任务开始运行之后,因此这个路径必须是集群A可访问,B是否可访问其实无所谓。
    >     >
    >     > (2)
    >     >
    > flink-sql情况下的kafak-client包。这个比较奇怪,从我的实验效果来看,貌似机器B上没有会报错,必须在启动sql-client的时候通过-j或-l参数指定jar路径,并包含kafka-client.jar,类似的还有flink-json等格式包。此外,这些包在集群A上都没有,但运行都正常。这么来看,我在sql-client中提交一个sql的时候,部分依赖应该是被打包并且上传到集群A的?这是怎么决定的呢?从我的sql中动态判定哪些包用到?还是咋的。
    >     >
    >     >
    >     >
    >     >
    > 总结下,FlinkSQL使用sql-client命令行方式提交sql执行任务的场景下。最终运行时候哪些包需要用到sql-client端的包呢?
    >     >
    >     > 目前从实验来看Flink-json、各种connector的包都是用的sql-client的包。
    >     >
    >     >
    >     >
    >
    >     --
    >     Best Regards
    >
    >     Jeff Zhang
    >
    >
    >
   

Reply | Threaded
Open this post in threaded view
|

Re: FlinkSQL命令行方式提交sql任务的jar依赖问题

Zhao,Yi(SEC)
补充:
刚刚翻了下源码,kafka那个直到原因了,见FlinkKafkaConsumer的288行,限定了必须是ByteArrayDeserializer,而且引用到了ByteArrayDeserializer类,这个是在new KafkaConsumer的过程就执行到的,所以这个依赖是提交端需要的。

按照 <[hidden email]> 的讲法,flink-sql按照-j或-l指定的包会被上传,这个倒也合理,毕竟有些任务特定需要一些包,提供这个功能肯定有用。
但像connector,json,csv这种非常通用的包感觉应该统一放入集群就好,但实际按照这个情况来看无法做到。
因为即使我把这些包统一放到了集群,实际提交段还是需要这些包,因为没有这些包提交sql时就直接报错了,于是还是需要通过-j或-l指定,然后进一步游会被上传?所以说,此处又涉及到一个flink集群上的包和sql-client提交的包重复的问题,一致还好,不一致情况下哪个优先呢?
___________

在 2020/8/14 上午10:46,“Zhao,Yi(SEC)”<[hidden email]> 写入:

    分析个报错,报错如下:
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
    the classpath.
   
    Reason: Required context properties mismatch.
   
    The matching candidates:
    org.apache.flink.table.sources.CsvAppendTableSourceFactory
    Mismatched properties:
    'connector.type' expects 'filesystem', but is 'kafka'
    'format.type' expects 'csv', but is 'json'
   
    The following properties are requested:
    从报错来看,是需要json的format,但实际只有csv。因为缺少json的format。
    这个实验,是我将相关所有jar都放到集群的flink的lib目录并重启了集群。
    但是提交sql(即执行sql-client.sh命令)的机器上没有这些依赖,报错如上。
   
    所以,这个根据我的表定义去找对应的format,以及connector等的过程是在提交端做的吗?
    还有一个更奇怪的,就算format,connector相关是提交端做的,但是我kafka的ssl证书路径的读取理论上肯定应该是在任务执行时候才会做,但当我执行select * from xxx提交sql之后马上报错了,报错为:
    Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArrayDeserializer
            at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    Kafka的序列化类理论上是作为kafkasource被创建时候的properties传入,然后kafkaConsumer执行期间才会发现这个class不存在吧。
   
    ___________________________________________________________________________________________________________
    在 2020/8/14 上午9:44,“godfrey he”<[hidden email]> 写入:
   
        sql client 中通过 -j 或者 -l 的指定的包会被随着job提交的时候一起上传到jm。
       
        Zhao,Yi(SEC) <[hidden email]> 于2020年8月13日周四 下午5:11写道:
       
        > A是10机器集群(HA模式,独立集群),B作为提交机器。
        > 从我实验效果来看,我是先启动一个sql-client的cli,如下命令:
        > ./bin/sql-client.sh embedded -l $(pwd)/libs_sql -l $(pwd)/libs_udf
        > 其中libs_sql中有:flink-connector-kafka_2.12-1.10.0.jar
        > flink-connector-kafka-base_2.12-1.10.0.jar  flink-jdbc_2.12-1.10.0.jar
        > flink-json-1.10.0.jar。然后A集群所有机器没有加这些包(Flink部署目录lib中没有)。A集群上其他任务提交的包中jar应该不致于会影响到我sql提交的任务。
        >
        > 结论是,我libs_sql中没有flink-json、flink-connector-kafka等的时候,提交sql任务会报错。加了的时候,提交sql任务不报错。
        > 所以感觉貌似提交sql任务会将启动sql-client时候指定的lib相关包都上传吗?
        > ————————————————————————————————————————————————————————
        >
        > 在 2020/8/13 下午3:10,“Jeff Zhang”<[hidden email]> 写入:
        >
        >     你的10台机器是flink standalone还是 yarn集群 ?
        >     如果是yarn集群,那么只要把依赖放到机器A就可以了,如果是standalone的话,就要在B集群上部署了依赖了。
        >
        >     另外你可以用zeppelin来提交flink sql 作业,zeppelin提供很多种添加第三方依赖的方法,具体可以参考文档
        >     https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s
        >     或者加入钉钉群讨论,钉钉群号: 32803524
        >
        >
        >     Zhao,Yi(SEC) <[hidden email]> 于2020年8月13日周四 下午1:02写道:
        >
        >     > 背景,我的flink集群(10机器)A,此外还有一个单独的flink机器(作为提交任务的机器)B。
        >     > 现在比较混乱,哪些jar需要放到A,哪些放到B。
        >     >
        >     >
        >     > (1)     kafka ssl
        >     >
        > 证书,这个肯定需要放到A,因为kafka提供的属性只是ssl证书的路径,真正读取证书是任务开始运行之后,因此这个路径必须是集群A可访问,B是否可访问其实无所谓。
        >     >
        >     > (2)
        >     >
        > flink-sql情况下的kafak-client包。这个比较奇怪,从我的实验效果来看,貌似机器B上没有会报错,必须在启动sql-client的时候通过-j或-l参数指定jar路径,并包含kafka-client.jar,类似的还有flink-json等格式包。此外,这些包在集群A上都没有,但运行都正常。这么来看,我在sql-client中提交一个sql的时候,部分依赖应该是被打包并且上传到集群A的?这是怎么决定的呢?从我的sql中动态判定哪些包用到?还是咋的。
        >     >
        >     >
        >     >
        >     >
        > 总结下,FlinkSQL使用sql-client命令行方式提交sql执行任务的场景下。最终运行时候哪些包需要用到sql-client端的包呢?
        >     >
        >     > 目前从实验来看Flink-json、各种connector的包都是用的sql-client的包。
        >     >
        >     >
        >     >
        >
        >     --
        >     Best Regards
        >
        >     Jeff Zhang
        >
        >
        >