Flink convert Table to DataSet[Row]

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink convert Table to DataSet[Row]

张锴
我在测试将hive查询出的数据转换成DataSet[Row]时,出现一些无法解决的问题,代码和异常 在附件中,麻烦各位小伙伴给看一下。
Reply | Threaded
Open this post in threaded view
|

Re: Flink convert Table to DataSet[Row]

Jingsong Li
Hi,

没看见有附件,请问为啥需要转车DateSet,Table里有啥搞不定呢?

Best,
Jingsong Lee

On Wed, May 20, 2020 at 1:26 PM 张锴 <[hidden email]> wrote:

> 我在测试将hive查询出的数据转换成DataSet[Row]时,出现一些无法解决的问题,代码和异常 在附件中,麻烦各位小伙伴给看一下。
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Flink convert Table to DataSet[Row]

张锴
业务需要转成这种数据格式,以便后续指标计算。我直接插入图片吧


Jingsong Li <[hidden email]> 于2020年5月20日周三 下午1:30写道:

> Hi,
>
> 没看见有附件,请问为啥需要转车DateSet,Table里有啥搞不定呢?
>
> Best,
> Jingsong Lee
>
> On Wed, May 20, 2020 at 1:26 PM 张锴 <[hidden email]> wrote:
>
> > 我在测试将hive查询出的数据转换成DataSet[Row]时,出现一些无法解决的问题,代码和异常 在附件中,麻烦各位小伙伴给看一下。
> >
>
>
> --
> Best, Jingsong Lee
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink convert Table to DataSet[Row]

张锴
In reply to this post by Jingsong Li



Jingsong Li <[hidden email]> 于2020年5月20日周三 下午1:30写道:
Hi,

没看见有附件,请问为啥需要转车DateSet,Table里有啥搞不定呢?

Best,
Jingsong Lee

On Wed, May 20, 2020 at 1:26 PM 张锴 <[hidden email]> wrote:

> 我在测试将hive查询出的数据转换成DataSet[Row]时,出现一些无法解决的问题,代码和异常 在附件中,麻烦各位小伙伴给看一下。
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Flink convert Table to DataSet[Row]

Jingsong Li
不好意思,

还是看不到你的图,可以考虑copy异常栈。

方便问一下后续的指标计算用Table/SQL搞不定吗?

Best,
Jingsong Lee

On Wed, May 20, 2020 at 1:52 PM 张锴 <[hidden email]> wrote:

> [image: 微信图片_20200520132244.png]
> [image: 微信图片_20200520132343.png]
>
> Jingsong Li <[hidden email]> 于2020年5月20日周三 下午1:30写道:
>
>> Hi,
>>
>> 没看见有附件,请问为啥需要转车DateSet,Table里有啥搞不定呢?
>>
>> Best,
>> Jingsong Lee
>>
>> On Wed, May 20, 2020 at 1:26 PM 张锴 <[hidden email]> wrote:
>>
>> > 我在测试将hive查询出的数据转换成DataSet[Row]时,出现一些无法解决的问题,代码和异常 在附件中,麻烦各位小伙伴给看一下。
>> >
>>
>>
>> --
>> Best, Jingsong Lee
>>
>

--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Flink convert Table to DataSet[Row]

张锴
  def main(args: Array[String]): Unit = {
    val tableEnvSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inBatchMode()
      .build()

    val tableEnv: TableEnvironment =
TableEnvironment.create(tableEnvSettings)

    val catalog = new HiveCatalog(
      "myhive", // catalog name
      "mydatabase", // default database
      "D:\\data\\conf", // Hive config (hive-site.xml) directory
      "3.1.2" // Hive version
    )

    tableEnv.registerCatalog("myhive", catalog)
    tableEnv.useCatalog("myhive")
    tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
    tableEnv.listTables().foreach(println)

    import org.apache.flink.table.api.scala._
    import org.apache.flink.api.scala._


    val mytable = tableEnv.from("mytable")
    val result = mytable
      .groupBy("pfid")
      .select("nv_mv", "pfid")
      .toDataSet[Row] // conversion to DataSet
      .print()

  }

  Exception in thread "main"
org.apache.flink.table.api.ValidationException: Only tables that originate
from Scala DataSets can be converted to Scala DataSets.
at
org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:56)
at HiveService$.main(HiveService.scala:40)
at HiveService.main(HiveService.scala)

Jingsong Li <[hidden email]> 于2020年5月20日周三 下午2:06写道:

> 不好意思,
>
> 还是看不到你的图,可以考虑copy异常栈。
>
> 方便问一下后续的指标计算用Table/SQL搞不定吗?
>
> Best,
> Jingsong Lee
>
> On Wed, May 20, 2020 at 1:52 PM 张锴 <[hidden email]> wrote:
>
> > [image: 微信图片_20200520132244.png]
> > [image: 微信图片_20200520132343.png]
> >
> > Jingsong Li <[hidden email]> 于2020年5月20日周三 下午1:30写道:
> >
> >> Hi,
> >>
> >> 没看见有附件,请问为啥需要转车DateSet,Table里有啥搞不定呢?
> >>
> >> Best,
> >> Jingsong Lee
> >>
> >> On Wed, May 20, 2020 at 1:26 PM 张锴 <[hidden email]> wrote:
> >>
> >> > 我在测试将hive查询出的数据转换成DataSet[Row]时,出现一些无法解决的问题,代码和异常 在附件中,麻烦各位小伙伴给看一下。
> >> >
> >>
> >>
> >> --
> >> Best, Jingsong Lee
> >>
> >
>
> --
> Best, Jingsong Lee
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink convert Table to DataSet[Row]

Jingsong Li
blink planner是不支持和Dataset的转换的。

Best,
Jingsong Lee

On Wed, May 20, 2020 at 2:49 PM 张锴 <[hidden email]> wrote:

>   def main(args: Array[String]): Unit = {
>     val tableEnvSettings = EnvironmentSettings.newInstance()
>       .useBlinkPlanner()
>       .inBatchMode()
>       .build()
>
>     val tableEnv: TableEnvironment =
> TableEnvironment.create(tableEnvSettings)
>
>     val catalog = new HiveCatalog(
>       "myhive", // catalog name
>       "mydatabase", // default database
>       "D:\\data\\conf", // Hive config (hive-site.xml) directory
>       "3.1.2" // Hive version
>     )
>
>     tableEnv.registerCatalog("myhive", catalog)
>     tableEnv.useCatalog("myhive")
>     tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
>     tableEnv.listTables().foreach(println)
>
>     import org.apache.flink.table.api.scala._
>     import org.apache.flink.api.scala._
>
>
>     val mytable = tableEnv.from("mytable")
>     val result = mytable
>       .groupBy("pfid")
>       .select("nv_mv", "pfid")
>       .toDataSet[Row] // conversion to DataSet
>       .print()
>
>   }
>
>   Exception in thread "main"
> org.apache.flink.table.api.ValidationException: Only tables that originate
> from Scala DataSets can be converted to Scala DataSets.
> at
>
> org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:56)
> at HiveService$.main(HiveService.scala:40)
> at HiveService.main(HiveService.scala)
>
> Jingsong Li <[hidden email]> 于2020年5月20日周三 下午2:06写道:
>
> > 不好意思,
> >
> > 还是看不到你的图,可以考虑copy异常栈。
> >
> > 方便问一下后续的指标计算用Table/SQL搞不定吗?
> >
> > Best,
> > Jingsong Lee
> >
> > On Wed, May 20, 2020 at 1:52 PM 张锴 <[hidden email]> wrote:
> >
> > > [image: 微信图片_20200520132244.png]
> > > [image: 微信图片_20200520132343.png]
> > >
> > > Jingsong Li <[hidden email]> 于2020年5月20日周三 下午1:30写道:
> > >
> > >> Hi,
> > >>
> > >> 没看见有附件,请问为啥需要转车DateSet,Table里有啥搞不定呢?
> > >>
> > >> Best,
> > >> Jingsong Lee
> > >>
> > >> On Wed, May 20, 2020 at 1:26 PM 张锴 <[hidden email]> wrote:
> > >>
> > >> > 我在测试将hive查询出的数据转换成DataSet[Row]时,出现一些无法解决的问题,代码和异常 在附件中,麻烦各位小伙伴给看一下。
> > >> >
> > >>
> > >>
> > >> --
> > >> Best, Jingsong Lee
> > >>
> > >
> >
> > --
> > Best, Jingsong Lee
> >
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Flink convert Table to DataSet[Row]

张锴
我用的是flink1.10的,那意思只能用flink planner的方式了吗

Jingsong Li <[hidden email]> 于2020年5月20日周三 下午2:55写道:

> blink planner是不支持和Dataset的转换的。
>
> Best,
> Jingsong Lee
>
> On Wed, May 20, 2020 at 2:49 PM 张锴 <[hidden email]> wrote:
>
> >   def main(args: Array[String]): Unit = {
> >     val tableEnvSettings = EnvironmentSettings.newInstance()
> >       .useBlinkPlanner()
> >       .inBatchMode()
> >       .build()
> >
> >     val tableEnv: TableEnvironment =
> > TableEnvironment.create(tableEnvSettings)
> >
> >     val catalog = new HiveCatalog(
> >       "myhive", // catalog name
> >       "mydatabase", // default database
> >       "D:\\data\\conf", // Hive config (hive-site.xml) directory
> >       "3.1.2" // Hive version
> >     )
> >
> >     tableEnv.registerCatalog("myhive", catalog)
> >     tableEnv.useCatalog("myhive")
> >     tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> >     tableEnv.listTables().foreach(println)
> >
> >     import org.apache.flink.table.api.scala._
> >     import org.apache.flink.api.scala._
> >
> >
> >     val mytable = tableEnv.from("mytable")
> >     val result = mytable
> >       .groupBy("pfid")
> >       .select("nv_mv", "pfid")
> >       .toDataSet[Row] // conversion to DataSet
> >       .print()
> >
> >   }
> >
> >   Exception in thread "main"
> > org.apache.flink.table.api.ValidationException: Only tables that
> originate
> > from Scala DataSets can be converted to Scala DataSets.
> > at
> >
> >
> org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:56)
> > at HiveService$.main(HiveService.scala:40)
> > at HiveService.main(HiveService.scala)
> >
> > Jingsong Li <[hidden email]> 于2020年5月20日周三 下午2:06写道:
> >
> > > 不好意思,
> > >
> > > 还是看不到你的图,可以考虑copy异常栈。
> > >
> > > 方便问一下后续的指标计算用Table/SQL搞不定吗?
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > On Wed, May 20, 2020 at 1:52 PM 张锴 <[hidden email]> wrote:
> > >
> > > > [image: 微信图片_20200520132244.png]
> > > > [image: 微信图片_20200520132343.png]
> > > >
> > > > Jingsong Li <[hidden email]> 于2020年5月20日周三 下午1:30写道:
> > > >
> > > >> Hi,
> > > >>
> > > >> 没看见有附件,请问为啥需要转车DateSet,Table里有啥搞不定呢?
> > > >>
> > > >> Best,
> > > >> Jingsong Lee
> > > >>
> > > >> On Wed, May 20, 2020 at 1:26 PM 张锴 <[hidden email]> wrote:
> > > >>
> > > >> > 我在测试将hive查询出的数据转换成DataSet[Row]时,出现一些无法解决的问题,代码和异常
> 在附件中,麻烦各位小伙伴给看一下。
> > > >> >
> > > >>
> > > >>
> > > >> --
> > > >> Best, Jingsong Lee
> > > >>
> > > >
> > >
> > > --
> > > Best, Jingsong Lee
> > >
> >
>
>
> --
> Best, Jingsong Lee
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink convert Table to DataSet[Row]

Jingsong Li
可能也有问题,flink planner可能已经不支持hive connector了。

On Wed, May 20, 2020 at 2:57 PM 张锴 <[hidden email]> wrote:

> 我用的是flink1.10的,那意思只能用flink planner的方式了吗
>
> Jingsong Li <[hidden email]> 于2020年5月20日周三 下午2:55写道:
>
> > blink planner是不支持和Dataset的转换的。
> >
> > Best,
> > Jingsong Lee
> >
> > On Wed, May 20, 2020 at 2:49 PM 张锴 <[hidden email]> wrote:
> >
> > >   def main(args: Array[String]): Unit = {
> > >     val tableEnvSettings = EnvironmentSettings.newInstance()
> > >       .useBlinkPlanner()
> > >       .inBatchMode()
> > >       .build()
> > >
> > >     val tableEnv: TableEnvironment =
> > > TableEnvironment.create(tableEnvSettings)
> > >
> > >     val catalog = new HiveCatalog(
> > >       "myhive", // catalog name
> > >       "mydatabase", // default database
> > >       "D:\\data\\conf", // Hive config (hive-site.xml) directory
> > >       "3.1.2" // Hive version
> > >     )
> > >
> > >     tableEnv.registerCatalog("myhive", catalog)
> > >     tableEnv.useCatalog("myhive")
> > >     tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> > >     tableEnv.listTables().foreach(println)
> > >
> > >     import org.apache.flink.table.api.scala._
> > >     import org.apache.flink.api.scala._
> > >
> > >
> > >     val mytable = tableEnv.from("mytable")
> > >     val result = mytable
> > >       .groupBy("pfid")
> > >       .select("nv_mv", "pfid")
> > >       .toDataSet[Row] // conversion to DataSet
> > >       .print()
> > >
> > >   }
> > >
> > >   Exception in thread "main"
> > > org.apache.flink.table.api.ValidationException: Only tables that
> > originate
> > > from Scala DataSets can be converted to Scala DataSets.
> > > at
> > >
> > >
> >
> org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:56)
> > > at HiveService$.main(HiveService.scala:40)
> > > at HiveService.main(HiveService.scala)
> > >
> > > Jingsong Li <[hidden email]> 于2020年5月20日周三 下午2:06写道:
> > >
> > > > 不好意思,
> > > >
> > > > 还是看不到你的图,可以考虑copy异常栈。
> > > >
> > > > 方便问一下后续的指标计算用Table/SQL搞不定吗?
> > > >
> > > > Best,
> > > > Jingsong Lee
> > > >
> > > > On Wed, May 20, 2020 at 1:52 PM 张锴 <[hidden email]> wrote:
> > > >
> > > > > [image: 微信图片_20200520132244.png]
> > > > > [image: 微信图片_20200520132343.png]
> > > > >
> > > > > Jingsong Li <[hidden email]> 于2020年5月20日周三 下午1:30写道:
> > > > >
> > > > >> Hi,
> > > > >>
> > > > >> 没看见有附件,请问为啥需要转车DateSet,Table里有啥搞不定呢?
> > > > >>
> > > > >> Best,
> > > > >> Jingsong Lee
> > > > >>
> > > > >> On Wed, May 20, 2020 at 1:26 PM 张锴 <[hidden email]> wrote:
> > > > >>
> > > > >> > 我在测试将hive查询出的数据转换成DataSet[Row]时,出现一些无法解决的问题,代码和异常
> > 在附件中,麻烦各位小伙伴给看一下。
> > > > >> >
> > > > >>
> > > > >>
> > > > >> --
> > > > >> Best, Jingsong Lee
> > > > >>
> > > > >
> > > >
> > > > --
> > > > Best, Jingsong Lee
> > > >
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>


--
Best, Jingsong Lee