大家好,
我周末升级到了1.9.0,但是在初始化table env的时候编译不通过,请大家帮忙看看是什么原因, import org.apache.flink.streaming.api.CheckpointingMode 提示有多个实现: 下面是pom文件: <dependency> |
试试把
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment 换成 import org.apache.flink.table.api.scala.StreamExecutionEnvironment 应该是意外 import 了不同包下的同名类的缘故 Best, tison. ddwcg <[hidden email]> 于2019年8月26日周一 上午11:12写道: > 大家好, > 我周末升级到了1.9.0,但是在初始化table env的时候编译不通过,请大家帮忙看看是什么原因, > > import org.apache.flink.streaming.api.CheckpointingMode > import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer > import org.apache.flink.streaming.util.serialization.SimpleStringSchema > import org.apache.flink.table.api.scala.StreamTableEnvironment > import org.apache.flink.table.planner.expressions.StddevPop > import org.apache.kafka.clients.consumer.ConsumerConfig > import org.apache.kafka.clients.producer.ProducerConfig > > object StreamingJob { > def main(args: Array[String]) { > val kafkaTopic = "source.kafka.topic" > val jobName ="test" > val parallelism =1 > val checkPointPath ="checkpoint/" > val kafkaBrokers ="" > > // set up the streaming execution environment > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setParallelism(parallelism) > env.enableCheckpointing(10000) > env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) > env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) > env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) > //env.setStateBackend(new FsStateBackend(checkPointPath)) > > > val tableEnv = StreamTableEnvironment.create(env) > > > 提示有多个实现: > > 下面是pom文件: > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-scala_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > <scope>compile</scope> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > <scope>compile</scope> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > <scope>provided</scope> > </dependency> > <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime-blink --> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-runtime-blink_2.11</artifactId> > <version>1.9.0</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kafka_2.11</artifactId> > <version>1.9.0</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-common</artifactId> > <version>${flink.version}</version> > <scope>provided</scope> > </dependency> > > > > |
感谢您的回复,确实是这个原因。但是后面注册表的时候不能使用 Expression
总是感觉 java api 和scala api有点混乱了
|
不应该呀,我看到仍然有 def registerDataStream[T](name: String, dataStream: DataStream[T], fields: Expression*): Unit 这个方法的,你能提供完整一点的上下文和报错吗? Best, tison. ddwcg <[hidden email]> 于2019年8月26日周一 上午11:38写道:
|
Administrator
|
Hi,
关于 Expression的问题,你需要额外加入 import org.apache.flink.table.api._ 的 import。 See release note for more details: https://ci.apache.org/projects/flink/flink-docs-release-1.9/release-notes/flink-1.9.html#scala-expression-dsl-for-table-api-moved-to-flink-table-api-scala <https://ci.apache.org/projects/flink/flink-docs-release-1.9/release-notes/flink-1.9.html#scala-expression-dsl-for-table-api-moved-to-flink-table-api-scala> > 在 2019年8月26日,11:53,Zili Chen <[hidden email]> 写道: > > 不应该呀,我看到仍然有 > > def registerDataStream[T](name: String, dataStream: DataStream[T], fields: Expression*): Unit > > 这个方法的,你能提供完整一点的上下文和报错吗? > > Best, > tison. > > > ddwcg <[hidden email] <mailto:[hidden email]>> 于2019年8月26日周一 上午11:38写道: > 感谢您的回复,确实是这个原因。但是后面注册表的时候不能使用 Expression > 总是感觉 java api 和scala api有点混乱了 > > > >> 在 2019年8月26日,11:22,Zili Chen <[hidden email] <mailto:[hidden email]>> 写道: >> >> 试试把 >> >> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment >> >> 换成 >> >> import org.apache.flink.table.api.scala.StreamExecutionEnvironment >> >> 应该是意外 import 了不同包下的同名类的缘故 >> >> Best, >> tison. >> >> >> ddwcg <[hidden email] <mailto:[hidden email]>> 于2019年8月26日周一 上午11:12写道: >> >>> 大家好, >>> 我周末升级到了1.9.0,但是在初始化table env的时候编译不通过,请大家帮忙看看是什么原因, >>> >>> import org.apache.flink.streaming.api.CheckpointingMode >>> import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup >>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment >>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer >>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema >>> import org.apache.flink.table.api.scala.StreamTableEnvironment >>> import org.apache.flink.table.planner.expressions.StddevPop >>> import org.apache.kafka.clients.consumer.ConsumerConfig >>> import org.apache.kafka.clients.producer.ProducerConfig >>> >>> object StreamingJob { >>> def main(args: Array[String]) { >>> val kafkaTopic = "source.kafka.topic" >>> val jobName ="test" >>> val parallelism =1 >>> val checkPointPath ="checkpoint/" >>> val kafkaBrokers ="" >>> >>> // set up the streaming execution environment >>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>> env.setParallelism(parallelism) >>> env.enableCheckpointing(10000) >>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) >>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) >>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) >>> //env.setStateBackend(new FsStateBackend(checkPointPath)) >>> >>> >>> val tableEnv = StreamTableEnvironment.create(env) >>> >>> >>> 提示有多个实现: >>> >>> 下面是pom文件: >>> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-scala_${scala.binary.version}</artifactId> >>> <version>${flink.version}</version> >>> <scope>compile</scope> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> >>> <version>${flink.version}</version> >>> <scope>compile</scope> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> >>> <version>${flink.version}</version> >>> <scope>provided</scope> >>> </dependency> >>> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime-blink <https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime-blink> --> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-table-runtime-blink_2.11</artifactId> >>> <version>1.9.0</version> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-connector-kafka_2.11</artifactId> >>> <version>1.9.0</version> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-table-common</artifactId> >>> <version>${flink.version}</version> >>> <scope>provided</scope> >>> </dependency> >>> >>> >>> >>> > |
按照两位的方法修改后已经可以了,谢谢两位
> 在 2019年8月26日,12:28,Jark Wu <[hidden email]> 写道: > > Hi, > > 关于 Expression的问题,你需要额外加入 import org.apache.flink.table.api._ 的 import。 > > See release note for more details: https://ci.apache.org/projects/flink/flink-docs-release-1.9/release-notes/flink-1.9.html#scala-expression-dsl-for-table-api-moved-to-flink-table-api-scala <https://ci.apache.org/projects/flink/flink-docs-release-1.9/release-notes/flink-1.9.html#scala-expression-dsl-for-table-api-moved-to-flink-table-api-scala> > >> 在 2019年8月26日,11:53,Zili Chen <[hidden email]> 写道: >> >> 不应该呀,我看到仍然有 >> >> def registerDataStream[T](name: String, dataStream: DataStream[T], fields: Expression*): Unit >> >> 这个方法的,你能提供完整一点的上下文和报错吗? >> >> Best, >> tison. >> >> >> ddwcg <[hidden email] <mailto:[hidden email]>> 于2019年8月26日周一 上午11:38写道: >> 感谢您的回复,确实是这个原因。但是后面注册表的时候不能使用 Expression >> 总是感觉 java api 和scala api有点混乱了 >> >> >> >>> 在 2019年8月26日,11:22,Zili Chen <[hidden email] <mailto:[hidden email]>> 写道: >>> >>> 试试把 >>> >>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment >>> >>> 换成 >>> >>> import org.apache.flink.table.api.scala.StreamExecutionEnvironment >>> >>> 应该是意外 import 了不同包下的同名类的缘故 >>> >>> Best, >>> tison. >>> >>> >>> ddwcg <[hidden email] <mailto:[hidden email]>> 于2019年8月26日周一 上午11:12写道: >>> >>>> 大家好, >>>> 我周末升级到了1.9.0,但是在初始化table env的时候编译不通过,请大家帮忙看看是什么原因, >>>> >>>> import org.apache.flink.streaming.api.CheckpointingMode >>>> import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup >>>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment >>>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer >>>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema >>>> import org.apache.flink.table.api.scala.StreamTableEnvironment >>>> import org.apache.flink.table.planner.expressions.StddevPop >>>> import org.apache.kafka.clients.consumer.ConsumerConfig >>>> import org.apache.kafka.clients.producer.ProducerConfig >>>> >>>> object StreamingJob { >>>> def main(args: Array[String]) { >>>> val kafkaTopic = "source.kafka.topic" >>>> val jobName ="test" >>>> val parallelism =1 >>>> val checkPointPath ="checkpoint/" >>>> val kafkaBrokers ="" >>>> >>>> // set up the streaming execution environment >>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>> env.setParallelism(parallelism) >>>> env.enableCheckpointing(10000) >>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) >>>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) >>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) >>>> //env.setStateBackend(new FsStateBackend(checkPointPath)) >>>> >>>> >>>> val tableEnv = StreamTableEnvironment.create(env) >>>> >>>> >>>> 提示有多个实现: >>>> >>>> 下面是pom文件: >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-scala_${scala.binary.version}</artifactId> >>>> <version>${flink.version}</version> >>>> <scope>compile</scope> >>>> </dependency> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> >>>> <version>${flink.version}</version> >>>> <scope>compile</scope> >>>> </dependency> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> >>>> <version>${flink.version}</version> >>>> <scope>provided</scope> >>>> </dependency> >>>> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime-blink <https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime-blink> --> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-table-runtime-blink_2.11</artifactId> >>>> <version>1.9.0</version> >>>> </dependency> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-connector-kafka_2.11</artifactId> >>>> <version>1.9.0</version> >>>> </dependency> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-table-common</artifactId> >>>> <version>${flink.version}</version> >>>> <scope>provided</scope> >>>> </dependency> >>>> >>>> >>>> >>>> >> > |
Free forum by Nabble | Edit this page |