有人帮我看下这个问题吗,谢谢
|
Hi,我不能加载你邮件中的图片。从下面的报错看起来是因为找不到 match 的connector。可以检查一下 DDL 中的 with 属性是否正确。
在 2020-05-25 00:11:16,"macia kk" <[hidden email]> 写道: 有人帮我看下这个问题吗,谢谢 org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed. Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: Required context properties mismatch. The matching candidates: org.apache.flink.table.sources.CsvAppendTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'kafka' 'format.type' expects 'csv', but is 'json' The following properties are requested: connector.properties.bootstrap.servers=ip-10-128-145-1.idata-server.shopee.io:9092connector.properties.group.id=keystats_aripay connector.property-version=1 connector.startup-mode=latest-offset connector.topic=ebisu_wallet_id_db_mirror_v1 connector.type=kafka format.property-version=1 format.type=json schema.0.data-type=INT schema.0.name=ts schema.1.data-type=VARCHAR(2147483647) schema.1.name=table schema.2.data-type=VARCHAR(2147483647) schema.2.name=database update-mode=append The following factories have been considered: org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52) ... 39 more |
感谢,我在之前的邮件记录中搜索到了答案。我现在遇到了新的问题,卡主了好久:
Table API, sink to Kafka val result = bsTableEnv.sqlQuery("SELECT * FROM " + "pppppppp") bsTableEnv .connect( new Kafka() .version("0.11") // required: valid connector versions are .topic("aaa") // required: topic name from which the table is read .property("zookeeper.connect", "xxx") .property("bootstrap.servers", "yyy") ) .withFormat(new Json()) .withSchema(new Schema() .field("ts", INT()) .field("table", STRING()) .field("database", STRING()) ) .createTemporaryTable("zzzzz") result.insertInto("mmmmm") Error: java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.<init>(Ljava/lang/String;Lorg/apache/flink/streaming/util/serialization/KeyedSerializationSchema;Ljava/util/Properties;Ljava/util/Optional;)V at org.apache.flink.streaming.connectors.kafka.Kafka011TableSink.createKafkaProducer(Kafka011TableSink.java:58) at org.apache.flink.streaming.connectors.kafka.KafkaTableSinkBase.consumeDataStream(KafkaTableSinkBase.java:95) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:140) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355) at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334) at org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411) at com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:74) at com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:30) at com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1982) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) 麻烦帮我看下,谢谢 Lijie Wang <[hidden email]> 于2020年5月25日周一 上午12:34写道: > Hi,我不能加载你邮件中的图片。从下面的报错看起来是因为找不到 match 的connector。可以检查一下 DDL 中的 with 属性是否正确。 > > > > 在 2020-05-25 00:11:16,"macia kk" <[hidden email]> 写道: > > 有人帮我看下这个问题吗,谢谢 > > > > > > > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error: findAndCreateTableSource failed. > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: > Could not find a suitable table factory for > 'org.apache.flink.table.factories.TableSourceFactory' in > the classpath. > Reason: Required context properties mismatch. > > The matching candidates: > org.apache.flink.table.sources.CsvAppendTableSourceFactory > Mismatched properties: > 'connector.type' expects 'filesystem', but is 'kafka' > 'format.type' expects 'csv', but is 'json' > > The following properties are requested: > connector.properties.bootstrap.servers=ip-10-128- > 145-1.idata-server.shopee.io:9092connector.properties.group.id > =keystats_aripay > connector.property-version=1 > connector.startup-mode=latest-offset > connector.topic=ebisu_wallet_id_db_mirror_v1 > connector.type=kafka > format.property-version=1 > format.type=json > schema.0.data-type=INT > schema.0.name=ts > schema.1.data-type=VARCHAR(2147483647) > schema.1.name=table > schema.2.data-type=VARCHAR(2147483647) > schema.2.name=database > update-mode=append > > The following factories have been considered: > org.apache.flink.table.sources.CsvBatchTableSourceFactory > org.apache.flink.table.sources.CsvAppendTableSourceFactory > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory > at > org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) > at > org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) > at > org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) > at > org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) > at > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52) > ... 39 more > |
Hi,
你使用的kafka connector的版本是0.11的吗?报错看起来有点像版本不对 Best, Leonard Xu > 在 2020年5月25日,02:44,macia kk <[hidden email]> 写道: > > 感谢,我在之前的邮件记录中搜索到了答案。我现在遇到了新的问题,卡主了好久: > > Table API, sink to Kafka > > val result = bsTableEnv.sqlQuery("SELECT * FROM " + "pppppppp") > > bsTableEnv > .connect( > new Kafka() > .version("0.11") // required: valid connector versions are > .topic("aaa") // required: topic name from which the table is read > .property("zookeeper.connect", "xxx") > .property("bootstrap.servers", "yyy") > ) > .withFormat(new Json()) > .withSchema(new Schema() > .field("ts", INT()) > .field("table", STRING()) > .field("database", STRING()) > ) > .createTemporaryTable("zzzzz") > > result.insertInto("mmmmm") > > Error: > > java.lang.NoSuchMethodError: > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.<init>(Ljava/lang/String;Lorg/apache/flink/streaming/util/serialization/KeyedSerializationSchema;Ljava/util/Properties;Ljava/util/Optional;)V > at org.apache.flink.streaming.connectors.kafka.Kafka011TableSink.createKafkaProducer(Kafka011TableSink.java:58) > at org.apache.flink.streaming.connectors.kafka.KafkaTableSinkBase.consumeDataStream(KafkaTableSinkBase.java:95) > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:140) > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) > at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) > at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) > at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) > at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) > at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355) > at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334) > at org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411) > at com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:74) > at com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:30) > at com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) > at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) > at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) > at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1982) > at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) > > > 麻烦帮我看下,谢谢 > > Lijie Wang <[hidden email]> 于2020年5月25日周一 上午12:34写道: > >> Hi,我不能加载你邮件中的图片。从下面的报错看起来是因为找不到 match 的connector。可以检查一下 DDL 中的 with 属性是否正确。 >> >> >> >> 在 2020-05-25 00:11:16,"macia kk" <[hidden email]> 写道: >> >> 有人帮我看下这个问题吗,谢谢 >> >> >> >> >> >> >> org.apache.flink.client.program.ProgramInvocationException: The main >> method caused an error: findAndCreateTableSource failed. >> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: >> Could not find a suitable table factory for >> 'org.apache.flink.table.factories.TableSourceFactory' in >> the classpath. >> Reason: Required context properties mismatch. >> >> The matching candidates: >> org.apache.flink.table.sources.CsvAppendTableSourceFactory >> Mismatched properties: >> 'connector.type' expects 'filesystem', but is 'kafka' >> 'format.type' expects 'csv', but is 'json' >> >> The following properties are requested: >> connector.properties.bootstrap.servers=ip-10-128- >> 145-1.idata-server.shopee.io:9092connector.properties.group.id >> =keystats_aripay >> connector.property-version=1 >> connector.startup-mode=latest-offset >> connector.topic=ebisu_wallet_id_db_mirror_v1 >> connector.type=kafka >> format.property-version=1 >> format.type=json >> schema.0.data-type=INT >> schema.0.name=ts >> schema.1.data-type=VARCHAR(2147483647) >> schema.1.name=table >> schema.2.data-type=VARCHAR(2147483647) >> schema.2.name=database >> update-mode=append >> >> The following factories have been considered: >> org.apache.flink.table.sources.CsvBatchTableSourceFactory >> org.apache.flink.table.sources.CsvAppendTableSourceFactory >> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory >> at >> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) >> at >> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) >> at >> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) >> at >> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) >> at >> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52) >> ... 39 more >> |
built.sbt
val flinkVersion = "1.10.0" libraryDependencies ++= Seq( "org.apache.flink" %% "flink-streaming-scala" % flinkVersion , "org.apache.flink" %% "flink-scala" % flinkVersion, "org.apache.flink" %% "flink-statebackend-rocksdb" % flinkVersion, "org.apache.flink" % "flink-table-common" % flinkVersion, "org.apache.flink" %% "flink-table-api-scala" % flinkVersion, "org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion, "org.apache.flink" %% "flink-table-planner-blink" % flinkVersion % "provided", "org.apache.flink" %% "flink-connector-kafka" % flinkVersion, "org.apache.flink" %% "flink-sql-connector-kafka-0.11" % flinkVersion, // <<<<<<<<<<<<<<<<<<<<< Kafka 0.11 "org.apache.flink" % "flink-json" % flinkVersion ) Leonard Xu <[hidden email]> 于2020年5月25日周一 上午9:33写道: > Hi, > 你使用的kafka connector的版本是0.11的吗?报错看起来有点像版本不对 > > Best, > Leonard Xu > > > 在 2020年5月25日,02:44,macia kk <[hidden email]> 写道: > > > > 感谢,我在之前的邮件记录中搜索到了答案。我现在遇到了新的问题,卡主了好久: > > > > Table API, sink to Kafka > > > > val result = bsTableEnv.sqlQuery("SELECT * FROM " + "pppppppp") > > > > bsTableEnv > > .connect( > > new Kafka() > > .version("0.11") // required: valid connector versions are > > .topic("aaa") // required: topic name from which the table is > read > > .property("zookeeper.connect", "xxx") > > .property("bootstrap.servers", "yyy") > > ) > > .withFormat(new Json()) > > .withSchema(new Schema() > > .field("ts", INT()) > > .field("table", STRING()) > > .field("database", STRING()) > > ) > > .createTemporaryTable("zzzzz") > > > > result.insertInto("mmmmm") > > > > Error: > > > > java.lang.NoSuchMethodError: > > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.<init>(Ljava/lang/String;Lorg/apache/flink/streaming/util/serialization/KeyedSerializationSchema;Ljava/util/Properties;Ljava/util/Optional;)V > > at > org.apache.flink.streaming.connectors.kafka.Kafka011TableSink.createKafkaProducer(Kafka011TableSink.java:58) > > at > org.apache.flink.streaming.connectors.kafka.KafkaTableSinkBase.consumeDataStream(KafkaTableSinkBase.java:95) > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:140) > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) > > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) > > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) > > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) > > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > > at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) > > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) > > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355) > > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334) > > at > org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411) > > at > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:74) > > at > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:30) > > at > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) > > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) > > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) > > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) > > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) > > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > > at java.security.AccessController.doPrivileged(Native Method) > > at javax.security.auth.Subject.doAs(Subject.java:422) > > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1982) > > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) > > > > > > 麻烦帮我看下,谢谢 > > > > Lijie Wang <[hidden email]> 于2020年5月25日周一 上午12:34写道: > > > >> Hi,我不能加载你邮件中的图片。从下面的报错看起来是因为找不到 match 的connector。可以检查一下 DDL 中的 with > 属性是否正确。 > >> > >> > >> > >> 在 2020-05-25 00:11:16,"macia kk" <[hidden email]> 写道: > >> > >> 有人帮我看下这个问题吗,谢谢 > >> > >> > >> > >> > >> > >> > >> org.apache.flink.client.program.ProgramInvocationException: The main > >> method caused an error: findAndCreateTableSource failed. > >> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: > >> Could not find a suitable table factory for > >> 'org.apache.flink.table.factories.TableSourceFactory' in > >> the classpath. > >> Reason: Required context properties mismatch. > >> > >> The matching candidates: > >> org.apache.flink.table.sources.CsvAppendTableSourceFactory > >> Mismatched properties: > >> 'connector.type' expects 'filesystem', but is 'kafka' > >> 'format.type' expects 'csv', but is 'json' > >> > >> The following properties are requested: > >> connector.properties.bootstrap.servers=ip-10-128- > >> 145-1.idata-server.shopee.io:9092connector.properties.group.id > >> =keystats_aripay > >> connector.property-version=1 > >> connector.startup-mode=latest-offset > >> connector.topic=ebisu_wallet_id_db_mirror_v1 > >> connector.type=kafka > >> format.property-version=1 > >> format.type=json > >> schema.0.data-type=INT > >> schema.0.name=ts > >> schema.1.data-type=VARCHAR(2147483647) > >> schema.1.name=table > >> schema.2.data-type=VARCHAR(2147483647) > >> schema.2.name=database > >> update-mode=append > >> > >> The following factories have been considered: > >> org.apache.flink.table.sources.CsvBatchTableSourceFactory > >> org.apache.flink.table.sources.CsvAppendTableSourceFactory > >> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory > >> at > >> > org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) > >> at > >> > org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) > >> at > >> > org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) > >> at > >> > org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) > >> at > >> > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52) > >> ... 39 more > >> > > |
对了还有个问题,我之前看文档使用 `flink-connector-kafka_2.11`一直都无法运行,后来看别人也遇到这道这个问题,改成
`flink-sql-connector-kafka-0.11` 才可以运行,这两个有什么区别,如果不一样的话,对于 table&SQL API 最好标明一下用后者 macia kk <[hidden email]> 于2020年5月25日周一 上午10:05写道: > built.sbt > > val flinkVersion = "1.10.0" > libraryDependencies ++= Seq( > "org.apache.flink" %% "flink-streaming-scala" % flinkVersion , > "org.apache.flink" %% "flink-scala" % flinkVersion, > "org.apache.flink" %% "flink-statebackend-rocksdb" % flinkVersion, > > "org.apache.flink" % "flink-table-common" % flinkVersion, > "org.apache.flink" %% "flink-table-api-scala" % flinkVersion, > "org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion, > "org.apache.flink" %% "flink-table-planner-blink" % flinkVersion % "provided", > > "org.apache.flink" %% "flink-connector-kafka" % flinkVersion, > "org.apache.flink" %% "flink-sql-connector-kafka-0.11" % flinkVersion, // <<<<<<<<<<<<<<<<<<<<< Kafka 0.11 > "org.apache.flink" % "flink-json" % flinkVersion > ) > > > Leonard Xu <[hidden email]> 于2020年5月25日周一 上午9:33写道: > >> Hi, >> 你使用的kafka connector的版本是0.11的吗?报错看起来有点像版本不对 >> >> Best, >> Leonard Xu >> >> > 在 2020年5月25日,02:44,macia kk <[hidden email]> 写道: >> > >> > 感谢,我在之前的邮件记录中搜索到了答案。我现在遇到了新的问题,卡主了好久: >> > >> > Table API, sink to Kafka >> > >> > val result = bsTableEnv.sqlQuery("SELECT * FROM " + "pppppppp") >> > >> > bsTableEnv >> > .connect( >> > new Kafka() >> > .version("0.11") // required: valid connector versions are >> > .topic("aaa") // required: topic name from which the table is >> read >> > .property("zookeeper.connect", "xxx") >> > .property("bootstrap.servers", "yyy") >> > ) >> > .withFormat(new Json()) >> > .withSchema(new Schema() >> > .field("ts", INT()) >> > .field("table", STRING()) >> > .field("database", STRING()) >> > ) >> > .createTemporaryTable("zzzzz") >> > >> > result.insertInto("mmmmm") >> > >> > Error: >> > >> > java.lang.NoSuchMethodError: >> > >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.<init>(Ljava/lang/String;Lorg/apache/flink/streaming/util/serialization/KeyedSerializationSchema;Ljava/util/Properties;Ljava/util/Optional;)V >> > at >> org.apache.flink.streaming.connectors.kafka.Kafka011TableSink.createKafkaProducer(Kafka011TableSink.java:58) >> > at >> org.apache.flink.streaming.connectors.kafka.KafkaTableSinkBase.consumeDataStream(KafkaTableSinkBase.java:95) >> > at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:140) >> > at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >> > at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >> > at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) >> > at >> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) >> > at >> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) >> > at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> > at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> > at scala.collection.Iterator$class.foreach(Iterator.scala:891) >> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> > at >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >> > at scala.collection.AbstractTraversable.map(Traversable.scala:104) >> > at >> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) >> > at >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) >> > at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) >> > at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355) >> > at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334) >> > at >> org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411) >> > at >> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:74) >> > at >> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:30) >> > at >> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala) >> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> > at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> > at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> > at java.lang.reflect.Method.invoke(Method.java:498) >> > at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) >> > at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) >> > at >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) >> > at >> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) >> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) >> > at >> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) >> > at >> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) >> > at java.security.AccessController.doPrivileged(Native Method) >> > at javax.security.auth.Subject.doAs(Subject.java:422) >> > at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1982) >> > at >> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) >> > >> > >> > 麻烦帮我看下,谢谢 >> > >> > Lijie Wang <[hidden email]> 于2020年5月25日周一 上午12:34写道: >> > >> >> Hi,我不能加载你邮件中的图片。从下面的报错看起来是因为找不到 match 的connector。可以检查一下 DDL 中的 with >> 属性是否正确。 >> >> >> >> >> >> >> >> 在 2020-05-25 00:11:16,"macia kk" <[hidden email]> 写道: >> >> >> >> 有人帮我看下这个问题吗,谢谢 >> >> >> >> >> >> >> >> >> >> >> >> >> >> org.apache.flink.client.program.ProgramInvocationException: The main >> >> method caused an error: findAndCreateTableSource failed. >> >> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: >> >> Could not find a suitable table factory for >> >> 'org.apache.flink.table.factories.TableSourceFactory' in >> >> the classpath. >> >> Reason: Required context properties mismatch. >> >> >> >> The matching candidates: >> >> org.apache.flink.table.sources.CsvAppendTableSourceFactory >> >> Mismatched properties: >> >> 'connector.type' expects 'filesystem', but is 'kafka' >> >> 'format.type' expects 'csv', but is 'json' >> >> >> >> The following properties are requested: >> >> connector.properties.bootstrap.servers=ip-10-128- >> >> 145-1.idata-server.shopee.io:9092connector.properties.group.id >> >> =keystats_aripay >> >> connector.property-version=1 >> >> connector.startup-mode=latest-offset >> >> connector.topic=ebisu_wallet_id_db_mirror_v1 >> >> connector.type=kafka >> >> format.property-version=1 >> >> format.type=json >> >> schema.0.data-type=INT >> >> schema.0.name=ts >> >> schema.1.data-type=VARCHAR(2147483647) >> >> schema.1.name=table >> >> schema.2.data-type=VARCHAR(2147483647) >> >> schema.2.name=database >> >> update-mode=append >> >> >> >> The following factories have been considered: >> >> org.apache.flink.table.sources.CsvBatchTableSourceFactory >> >> org.apache.flink.table.sources.CsvAppendTableSourceFactory >> >> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory >> >> at >> >> >> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) >> >> at >> >> >> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) >> >> at >> >> >> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) >> >> at >> >> >> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) >> >> at >> >> >> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52) >> >> ... 39 more >> >> >> >> |
Hi,
> 对了还有个问题,我之前看文档使用 `flink-connector-kafka_2.11`一直都无法运行,后来看别人也遇到这道这个问题,改成 > `flink-sql-connector-kafka-0.11` > 才可以运行,这两个有什么区别,如果不一样的话,对于 table&SQL API 最好标明一下用后者 flink-connector-kafka_2.11 是dataStream API编程使用的 flink-sql-connector-kafka-0.11_2.11 是 Table API & SQL 编程使用的,其中0.11是kafka版本,2.11是scala版本 如果是Table API & SQL程序不用加 flink-connector-kafka_2.11 的依赖,你的case把dataStream的connector依赖去掉, 把 sql connector的依赖改为 flink-sql-connector-kafka-0.11_2.11 试下 Best, Leonard Xu > > macia kk <[hidden email]> 于2020年5月25日周一 上午10:05写道: > >> built.sbt >> >> val flinkVersion = "1.10.0" >> libraryDependencies ++= Seq( >> "org.apache.flink" %% "flink-streaming-scala" % flinkVersion , >> "org.apache.flink" %% "flink-scala" % flinkVersion, >> "org.apache.flink" %% "flink-statebackend-rocksdb" % flinkVersion, >> >> "org.apache.flink" % "flink-table-common" % flinkVersion, >> "org.apache.flink" %% "flink-table-api-scala" % flinkVersion, >> "org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion, >> "org.apache.flink" %% "flink-table-planner-blink" % flinkVersion % "provided", >> >> "org.apache.flink" %% "flink-connector-kafka" % flinkVersion, >> "org.apache.flink" %% "flink-sql-connector-kafka-0.11" % flinkVersion, // <<<<<<<<<<<<<<<<<<<<< Kafka 0.11 >> "org.apache.flink" % "flink-json" % flinkVersion >> ) >> >> >> Leonard Xu <[hidden email]> 于2020年5月25日周一 上午9:33写道: >> >>> Hi, >>> 你使用的kafka connector的版本是0.11的吗?报错看起来有点像版本不对 >>> >>> Best, >>> Leonard Xu >>> >>>> 在 2020年5月25日,02:44,macia kk <[hidden email]> 写道: >>>> >>>> 感谢,我在之前的邮件记录中搜索到了答案。我现在遇到了新的问题,卡主了好久: >>>> >>>> Table API, sink to Kafka >>>> >>>> val result = bsTableEnv.sqlQuery("SELECT * FROM " + "pppppppp") >>>> >>>> bsTableEnv >>>> .connect( >>>> new Kafka() >>>> .version("0.11") // required: valid connector versions are >>>> .topic("aaa") // required: topic name from which the table is >>> read >>>> .property("zookeeper.connect", "xxx") >>>> .property("bootstrap.servers", "yyy") >>>> ) >>>> .withFormat(new Json()) >>>> .withSchema(new Schema() >>>> .field("ts", INT()) >>>> .field("table", STRING()) >>>> .field("database", STRING()) >>>> ) >>>> .createTemporaryTable("zzzzz") >>>> >>>> result.insertInto("mmmmm") >>>> >>>> Error: >>>> >>>> java.lang.NoSuchMethodError: >>>> >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.<init>(Ljava/lang/String;Lorg/apache/flink/streaming/util/serialization/KeyedSerializationSchema;Ljava/util/Properties;Ljava/util/Optional;)V >>>> at >>> org.apache.flink.streaming.connectors.kafka.Kafka011TableSink.createKafkaProducer(Kafka011TableSink.java:58) >>>> at >>> org.apache.flink.streaming.connectors.kafka.KafkaTableSinkBase.consumeDataStream(KafkaTableSinkBase.java:95) >>>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:140) >>>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >>>> at >>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) >>>> at >>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) >>>> at >>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) >>>> at >>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>>> at >>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>>> at >>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >>>> at scala.collection.AbstractTraversable.map(Traversable.scala:104) >>>> at >>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) >>>> at >>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) >>>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) >>>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355) >>>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334) >>>> at >>> org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411) >>>> at >>> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:74) >>>> at >>> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:30) >>>> at >>> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) >>>> at >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) >>>> at >>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) >>>> at >>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) >>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) >>>> at >>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) >>>> at >>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) >>>> at java.security.AccessController.doPrivileged(Native Method) >>>> at javax.security.auth.Subject.doAs(Subject.java:422) >>>> at >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1982) >>>> at >>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) >>>> >>>> >>>> 麻烦帮我看下,谢谢 >>>> >>>> Lijie Wang <[hidden email]> 于2020年5月25日周一 上午12:34写道: >>>> >>>>> Hi,我不能加载你邮件中的图片。从下面的报错看起来是因为找不到 match 的connector。可以检查一下 DDL 中的 with >>> 属性是否正确。 >>>>> >>>>> >>>>> >>>>> 在 2020-05-25 00:11:16,"macia kk" <[hidden email]> 写道: >>>>> >>>>> 有人帮我看下这个问题吗,谢谢 >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>>> method caused an error: findAndCreateTableSource failed. >>>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: >>>>> Could not find a suitable table factory for >>>>> 'org.apache.flink.table.factories.TableSourceFactory' in >>>>> the classpath. >>>>> Reason: Required context properties mismatch. >>>>> >>>>> The matching candidates: >>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory >>>>> Mismatched properties: >>>>> 'connector.type' expects 'filesystem', but is 'kafka' >>>>> 'format.type' expects 'csv', but is 'json' >>>>> >>>>> The following properties are requested: >>>>> connector.properties.bootstrap.servers=ip-10-128- >>>>> 145-1.idata-server.shopee.io:9092connector.properties.group.id >>>>> =keystats_aripay >>>>> connector.property-version=1 >>>>> connector.startup-mode=latest-offset >>>>> connector.topic=ebisu_wallet_id_db_mirror_v1 >>>>> connector.type=kafka >>>>> format.property-version=1 >>>>> format.type=json >>>>> schema.0.data-type=INT >>>>> schema.0.name=ts >>>>> schema.1.data-type=VARCHAR(2147483647) >>>>> schema.1.name=table >>>>> schema.2.data-type=VARCHAR(2147483647) >>>>> schema.2.name=database >>>>> update-mode=append >>>>> >>>>> The following factories have been considered: >>>>> org.apache.flink.table.sources.CsvBatchTableSourceFactory >>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory >>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory >>>>> at >>>>> >>> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) >>>>> at >>>>> >>> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) >>>>> at >>>>> >>> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) >>>>> at >>>>> >>> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) >>>>> at >>>>> >>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52) >>>>> ... 39 more >>>>> >>> >>> |
非常感谢大佬,😀好人一生平安
看来是 flink-connector-kafka_2.11 留在 build.sbt 里冲突了 我好奇的是,作为新人,我没招文档里找到要用到 `flink-sql-connector-kafka`, 搜了很多才知道是这个问题 Leonard Xu <[hidden email]> 于2020年5月25日周一 上午10:44写道: > Hi, > > > > 对了还有个问题,我之前看文档使用 `flink-connector-kafka_2.11`一直都无法运行,后来看别人也遇到这道这个问题,改成 > > `flink-sql-connector-kafka-0.11` > > 才可以运行,这两个有什么区别,如果不一样的话,对于 table&SQL API 最好标明一下用后者 > > flink-connector-kafka_2.11 是dataStream API编程使用的 > flink-sql-connector-kafka-0.11_2.11 是 Table API & SQL > 编程使用的,其中0.11是kafka版本,2.11是scala版本 > 如果是Table API & SQL程序不用加 flink-connector-kafka_2.11 > 的依赖,你的case把dataStream的connector依赖去掉, > 把 sql connector的依赖改为 flink-sql-connector-kafka-0.11_2.11 试下 > > > Best, > Leonard Xu > > > > > > > > > > macia kk <[hidden email]> 于2020年5月25日周一 上午10:05写道: > > > >> built.sbt > >> > >> val flinkVersion = "1.10.0" > >> libraryDependencies ++= Seq( > >> "org.apache.flink" %% "flink-streaming-scala" % flinkVersion , > >> "org.apache.flink" %% "flink-scala" % flinkVersion, > >> "org.apache.flink" %% "flink-statebackend-rocksdb" % flinkVersion, > >> > >> "org.apache.flink" % "flink-table-common" % flinkVersion, > >> "org.apache.flink" %% "flink-table-api-scala" % flinkVersion, > >> "org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion, > >> "org.apache.flink" %% "flink-table-planner-blink" % flinkVersion % > "provided", > >> > >> "org.apache.flink" %% "flink-connector-kafka" % flinkVersion, > >> "org.apache.flink" %% "flink-sql-connector-kafka-0.11" % > flinkVersion, // <<<<<<<<<<<<<<<<<<<<< Kafka 0.11 > >> "org.apache.flink" % "flink-json" % flinkVersion > >> ) > >> > >> > >> Leonard Xu <[hidden email]> 于2020年5月25日周一 上午9:33写道: > >> > >>> Hi, > >>> 你使用的kafka connector的版本是0.11的吗?报错看起来有点像版本不对 > >>> > >>> Best, > >>> Leonard Xu > >>> > >>>> 在 2020年5月25日,02:44,macia kk <[hidden email]> 写道: > >>>> > >>>> 感谢,我在之前的邮件记录中搜索到了答案。我现在遇到了新的问题,卡主了好久: > >>>> > >>>> Table API, sink to Kafka > >>>> > >>>> val result = bsTableEnv.sqlQuery("SELECT * FROM " + "pppppppp") > >>>> > >>>> bsTableEnv > >>>> .connect( > >>>> new Kafka() > >>>> .version("0.11") // required: valid connector versions are > >>>> .topic("aaa") // required: topic name from which the table is > >>> read > >>>> .property("zookeeper.connect", "xxx") > >>>> .property("bootstrap.servers", "yyy") > >>>> ) > >>>> .withFormat(new Json()) > >>>> .withSchema(new Schema() > >>>> .field("ts", INT()) > >>>> .field("table", STRING()) > >>>> .field("database", STRING()) > >>>> ) > >>>> .createTemporaryTable("zzzzz") > >>>> > >>>> result.insertInto("mmmmm") > >>>> > >>>> Error: > >>>> > >>>> java.lang.NoSuchMethodError: > >>>> > >>> > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.<init>(Ljava/lang/String;Lorg/apache/flink/streaming/util/serialization/KeyedSerializationSchema;Ljava/util/Properties;Ljava/util/Optional;)V > >>>> at > >>> > org.apache.flink.streaming.connectors.kafka.Kafka011TableSink.createKafkaProducer(Kafka011TableSink.java:58) > >>>> at > >>> > org.apache.flink.streaming.connectors.kafka.KafkaTableSinkBase.consumeDataStream(KafkaTableSinkBase.java:95) > >>>> at > >>> > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:140) > >>>> at > >>> > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) > >>>> at > >>> > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > >>>> at > >>> > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) > >>>> at > >>> > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) > >>>> at > >>> > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) > >>>> at > >>> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >>>> at > >>> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891) > >>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > >>>> at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > >>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > >>>> at > >>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > >>>> at scala.collection.AbstractTraversable.map(Traversable.scala:104) > >>>> at > >>> > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) > >>>> at > >>> > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) > >>>> at > >>> > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > >>>> at > >>> > org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355) > >>>> at > >>> > org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334) > >>>> at > >>> > org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411) > >>>> at > >>> > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:74) > >>>> at > >>> > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:30) > >>>> at > >>> > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala) > >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >>>> at > >>> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >>>> at > >>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >>>> at java.lang.reflect.Method.invoke(Method.java:498) > >>>> at > >>> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) > >>>> at > >>> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) > >>>> at > >>> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) > >>>> at > >>> > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) > >>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > >>>> at > >>> > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) > >>>> at > >>> > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > >>>> at java.security.AccessController.doPrivileged(Native Method) > >>>> at javax.security.auth.Subject.doAs(Subject.java:422) > >>>> at > >>> > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1982) > >>>> at > >>> > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > >>>> at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) > >>>> > >>>> > >>>> 麻烦帮我看下,谢谢 > >>>> > >>>> Lijie Wang <[hidden email]> 于2020年5月25日周一 上午12:34写道: > >>>> > >>>>> Hi,我不能加载你邮件中的图片。从下面的报错看起来是因为找不到 match 的connector。可以检查一下 DDL 中的 with > >>> 属性是否正确。 > >>>>> > >>>>> > >>>>> > >>>>> 在 2020-05-25 00:11:16,"macia kk" <[hidden email]> 写道: > >>>>> > >>>>> 有人帮我看下这个问题吗,谢谢 > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> org.apache.flink.client.program.ProgramInvocationException: The main > >>>>> method caused an error: findAndCreateTableSource failed. > >>>>> Caused by: > org.apache.flink.table.api.NoMatchingTableFactoryException: > >>>>> Could not find a suitable table factory for > >>>>> 'org.apache.flink.table.factories.TableSourceFactory' in > >>>>> the classpath. > >>>>> Reason: Required context properties mismatch. > >>>>> > >>>>> The matching candidates: > >>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory > >>>>> Mismatched properties: > >>>>> 'connector.type' expects 'filesystem', but is 'kafka' > >>>>> 'format.type' expects 'csv', but is 'json' > >>>>> > >>>>> The following properties are requested: > >>>>> connector.properties.bootstrap.servers=ip-10-128- > >>>>> 145-1.idata-server.shopee.io:9092connector.properties.group.id > >>>>> =keystats_aripay > >>>>> connector.property-version=1 > >>>>> connector.startup-mode=latest-offset > >>>>> connector.topic=ebisu_wallet_id_db_mirror_v1 > >>>>> connector.type=kafka > >>>>> format.property-version=1 > >>>>> format.type=json > >>>>> schema.0.data-type=INT > >>>>> schema.0.name=ts > >>>>> schema.1.data-type=VARCHAR(2147483647) > >>>>> schema.1.name=table > >>>>> schema.2.data-type=VARCHAR(2147483647) > >>>>> schema.2.name=database > >>>>> update-mode=append > >>>>> > >>>>> The following factories have been considered: > >>>>> org.apache.flink.table.sources.CsvBatchTableSourceFactory > >>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory > >>>>> > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory > >>>>> at > >>>>> > >>> > org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) > >>>>> at > >>>>> > >>> > org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) > >>>>> at > >>>>> > >>> > org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) > >>>>> at > >>>>> > >>> > org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) > >>>>> at > >>>>> > >>> > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52) > >>>>> ... 39 more > >>>>> > >>> > >>> > > |
Hi,
确实,connector包太多,DataStream 和 Table&SQL 分两套的问题,format的包也需要用户导入问题,确实比较困扰用户。 社区也在讨论flink打包方案[1]来降低用户接入成本。 祝好, Leonard Xu [1]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-quot-fat-quot-and-quot-slim-quot-Flink-distributions-tc40237.html#none <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-quot-fat-quot-and-quot-slim-quot-Flink-distributions-tc40237.html#none> > 在 2020年5月25日,11:12,macia kk <[hidden email]> 写道: > > 非常感谢大佬,😀好人一生平安 > > 看来是 flink-connector-kafka_2.11 留在 build.sbt 里冲突了 > > 我好奇的是,作为新人,我没招文档里找到要用到 `flink-sql-connector-kafka`, 搜了很多才知道是这个问题 > > Leonard Xu <[hidden email]> 于2020年5月25日周一 上午10:44写道: > >> Hi, >> >> >>> 对了还有个问题,我之前看文档使用 `flink-connector-kafka_2.11`一直都无法运行,后来看别人也遇到这道这个问题,改成 >>> `flink-sql-connector-kafka-0.11` >>> 才可以运行,这两个有什么区别,如果不一样的话,对于 table&SQL API 最好标明一下用后者 >> >> flink-connector-kafka_2.11 是dataStream API编程使用的 >> flink-sql-connector-kafka-0.11_2.11 是 Table API & SQL >> 编程使用的,其中0.11是kafka版本,2.11是scala版本 >> 如果是Table API & SQL程序不用加 flink-connector-kafka_2.11 >> 的依赖,你的case把dataStream的connector依赖去掉, >> 把 sql connector的依赖改为 flink-sql-connector-kafka-0.11_2.11 试下 >> >> >> Best, >> Leonard Xu >> >> >> >> >> >> >>> >>> macia kk <[hidden email]> 于2020年5月25日周一 上午10:05写道: >>> >>>> built.sbt >>>> >>>> val flinkVersion = "1.10.0" >>>> libraryDependencies ++= Seq( >>>> "org.apache.flink" %% "flink-streaming-scala" % flinkVersion , >>>> "org.apache.flink" %% "flink-scala" % flinkVersion, >>>> "org.apache.flink" %% "flink-statebackend-rocksdb" % flinkVersion, >>>> >>>> "org.apache.flink" % "flink-table-common" % flinkVersion, >>>> "org.apache.flink" %% "flink-table-api-scala" % flinkVersion, >>>> "org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion, >>>> "org.apache.flink" %% "flink-table-planner-blink" % flinkVersion % >> "provided", >>>> >>>> "org.apache.flink" %% "flink-connector-kafka" % flinkVersion, >>>> "org.apache.flink" %% "flink-sql-connector-kafka-0.11" % >> flinkVersion, // <<<<<<<<<<<<<<<<<<<<< Kafka 0.11 >>>> "org.apache.flink" % "flink-json" % flinkVersion >>>> ) >>>> >>>> >>>> Leonard Xu <[hidden email]> 于2020年5月25日周一 上午9:33写道: >>>> >>>>> Hi, >>>>> 你使用的kafka connector的版本是0.11的吗?报错看起来有点像版本不对 >>>>> >>>>> Best, >>>>> Leonard Xu >>>>> >>>>>> 在 2020年5月25日,02:44,macia kk <[hidden email]> 写道: >>>>>> >>>>>> 感谢,我在之前的邮件记录中搜索到了答案。我现在遇到了新的问题,卡主了好久: >>>>>> >>>>>> Table API, sink to Kafka >>>>>> >>>>>> val result = bsTableEnv.sqlQuery("SELECT * FROM " + "pppppppp") >>>>>> >>>>>> bsTableEnv >>>>>> .connect( >>>>>> new Kafka() >>>>>> .version("0.11") // required: valid connector versions are >>>>>> .topic("aaa") // required: topic name from which the table is >>>>> read >>>>>> .property("zookeeper.connect", "xxx") >>>>>> .property("bootstrap.servers", "yyy") >>>>>> ) >>>>>> .withFormat(new Json()) >>>>>> .withSchema(new Schema() >>>>>> .field("ts", INT()) >>>>>> .field("table", STRING()) >>>>>> .field("database", STRING()) >>>>>> ) >>>>>> .createTemporaryTable("zzzzz") >>>>>> >>>>>> result.insertInto("mmmmm") >>>>>> >>>>>> Error: >>>>>> >>>>>> java.lang.NoSuchMethodError: >>>>>> >>>>> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.<init>(Ljava/lang/String;Lorg/apache/flink/streaming/util/serialization/KeyedSerializationSchema;Ljava/util/Properties;Ljava/util/Optional;)V >>>>>> at >>>>> >> org.apache.flink.streaming.connectors.kafka.Kafka011TableSink.createKafkaProducer(Kafka011TableSink.java:58) >>>>>> at >>>>> >> org.apache.flink.streaming.connectors.kafka.KafkaTableSinkBase.consumeDataStream(KafkaTableSinkBase.java:95) >>>>>> at >>>>> >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:140) >>>>>> at >>>>> >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >>>>>> at >>>>> >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>>>> at >>>>> >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) >>>>>> at >>>>> >> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) >>>>>> at >>>>> >> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) >>>>>> at >>>>> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>>>>> at >>>>> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >>>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>>>>> at >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>>>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>>>>> at >>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >>>>>> at scala.collection.AbstractTraversable.map(Traversable.scala:104) >>>>>> at >>>>> >> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) >>>>>> at >>>>> >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) >>>>>> at >>>>> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) >>>>>> at >>>>> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355) >>>>>> at >>>>> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334) >>>>>> at >>>>> >> org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411) >>>>>> at >>>>> >> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:74) >>>>>> at >>>>> >> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:30) >>>>>> at >>>>> >> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala) >>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>> at >>>>> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>> at >>>>> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>> at >>>>> >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) >>>>>> at >>>>> >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) >>>>>> at >>>>> >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) >>>>>> at >>>>> >> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) >>>>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) >>>>>> at >>>>> >> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) >>>>>> at >>>>> >> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) >>>>>> at java.security.AccessController.doPrivileged(Native Method) >>>>>> at javax.security.auth.Subject.doAs(Subject.java:422) >>>>>> at >>>>> >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1982) >>>>>> at >>>>> >> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >>>>>> at >> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) >>>>>> >>>>>> >>>>>> 麻烦帮我看下,谢谢 >>>>>> >>>>>> Lijie Wang <[hidden email]> 于2020年5月25日周一 上午12:34写道: >>>>>> >>>>>>> Hi,我不能加载你邮件中的图片。从下面的报错看起来是因为找不到 match 的connector。可以检查一下 DDL 中的 with >>>>> 属性是否正确。 >>>>>>> >>>>>>> >>>>>>> >>>>>>> 在 2020-05-25 00:11:16,"macia kk" <[hidden email]> 写道: >>>>>>> >>>>>>> 有人帮我看下这个问题吗,谢谢 >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>>>>> method caused an error: findAndCreateTableSource failed. >>>>>>> Caused by: >> org.apache.flink.table.api.NoMatchingTableFactoryException: >>>>>>> Could not find a suitable table factory for >>>>>>> 'org.apache.flink.table.factories.TableSourceFactory' in >>>>>>> the classpath. >>>>>>> Reason: Required context properties mismatch. >>>>>>> >>>>>>> The matching candidates: >>>>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory >>>>>>> Mismatched properties: >>>>>>> 'connector.type' expects 'filesystem', but is 'kafka' >>>>>>> 'format.type' expects 'csv', but is 'json' >>>>>>> >>>>>>> The following properties are requested: >>>>>>> connector.properties.bootstrap.servers=ip-10-128- >>>>>>> 145-1.idata-server.shopee.io:9092connector.properties.group.id >>>>>>> =keystats_aripay >>>>>>> connector.property-version=1 >>>>>>> connector.startup-mode=latest-offset >>>>>>> connector.topic=ebisu_wallet_id_db_mirror_v1 >>>>>>> connector.type=kafka >>>>>>> format.property-version=1 >>>>>>> format.type=json >>>>>>> schema.0.data-type=INT >>>>>>> schema.0.name=ts >>>>>>> schema.1.data-type=VARCHAR(2147483647) >>>>>>> schema.1.name=table >>>>>>> schema.2.data-type=VARCHAR(2147483647) >>>>>>> schema.2.name=database >>>>>>> update-mode=append >>>>>>> >>>>>>> The following factories have been considered: >>>>>>> org.apache.flink.table.sources.CsvBatchTableSourceFactory >>>>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory >>>>>>> >> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory >>>>>>> at >>>>>>> >>>>> >> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) >>>>>>> at >>>>>>> >>>>> >> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) >>>>>>> at >>>>>>> >>>>> >> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) >>>>>>> at >>>>>>> >>>>> >> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) >>>>>>> at >>>>>>> >>>>> >> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52) >>>>>>> ... 39 more >>>>>>> >>>>> >>>>> >> >> |
Free forum by Nabble | Edit this page |