今天在使用Flink 1.11.3版本使用Flink SQL将kafka中数据导入到HDFS上时提示如下的错误
Caused by: org.apache.flink.table.api.TableException: Could not load service provider for factories.
at org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:346)
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:221)
at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
... 39 more
Caused by: java.util.ServiceConfigurationError: org.apache.flink.table.factories.Factory: Provider org.apache.flink.core.fs.HadoopFsFactory not found
at java.util.ServiceLoader.fail(ServiceLoader.java:239)
at java.util.ServiceLoader.access$300(ServiceLoader.java:185)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:372)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
SQL语句为:
CREATE TABLE hdfs_table (
content STRING,
dt STRING,
h STRING
) PARTITIONED BY (dt, h) WITH (
'connector' = 'filesystem',
'path '= 'hdfs://hdfsCluster/tmp/zyh_test',
'format' = 'csv'
);
出错后我们在代码中没看到有HDFS实现DynamicTableSinkFactory的相关类,是不是FlinkSQL不支持写入到HDFS中?通过Hive的connector来实现?
在测试前我们按照官方文档如下的操作,添加HDFS的相关类
org.apache.flink.table.factories.Factory 中为:org.apache.flink.core.fs.HadoopFsFactory
org.apache.flink.table.factories.TableFactory中为:org.apache.flink.table.filesystem.FileSystemTableFactory 但是添加后报上述错误
添加新的可插拔文件系统实现
文件系统通过org.apache.flink.core.fs.FileSystem类表示,该类捕获访问和修改该文件系统中文件和对象的方式。
要添加新的文件系统:
添加文件系统实现,它是的子类org.apache.flink.core.fs.FileSystem。
添加一个实例化该文件系统并声明用于注册FileSystem的方案的工厂。这必须是的子类org.apache.flink.core.fs.FileSystemFactory。
添加服务条目。创建一个META-INF/services/org.apache.flink.core.fs.FileSystemFactory包含文件系统工厂类的类名的文件(有关更多详细信息,请参见Java Service Loader文档)。
在插件发现期间,文件系统工厂类将由专用的Java类加载器加载,以避免与其他插件和Flink组件发生类冲突。在文件系统实例化和文件系统操作调用期间,应使用相同的类加载器。
[hidden email]