hi all,
blink模式下没法将table 转为 dataset , 所以如果直接collect了, 请问有类似方法可以获取到 结果用于代码调试么? -- Best, Jun Su |
hi all,
找到了源码中BatchTableEnvUtil类使用了CollectTableSink来做collect的逻辑, 但是下方代码运用了源码内部的private方法, 看起来不允许外部调用: def collect[T]( tEnv: TableEnvironment, table: Table, sink: CollectTableSink[T], jobName: Option[String]): Seq[T] = { val typeSerializer = fromDataTypeToLegacyInfo(sink.getConsumedDataType) .asInstanceOf[TypeInformation[T]] .createSerializer(tEnv.asInstanceOf[TableEnvironmentImpl] .getPlanner.asInstanceOf[PlannerBase].getExecEnv.getConfig) val id = new AbstractID().toString sink.init(typeSerializer.asInstanceOf[TypeSerializer[T]], id) val sinkName = UUID.randomUUID().toString tEnv.registerTableSink(sinkName, sink) tEnv.insertInto(table, sinkName) val res = tEnv.execute("test") val accResult: JArrayList[Array[Byte]] = res.getAccumulatorResult(id) SerializedListAccumulator.deserializeList(accResult, typeSerializer) } jun su <[hidden email]> 于2020年4月24日周五 下午2:05写道: > hi all, > > blink模式下没法将table 转为 dataset , 所以如果直接collect了, 请问有类似方法可以获取到 > 结果用于代码调试么? > > -- > Best, > Jun Su > -- Best, Jun Su |
1.10里面有TableUtils了,里面有collectToList
Best, Jingsong Lee On Fri, Apr 24, 2020 at 2:49 PM jun su <[hidden email]> wrote: > hi all, > > 找到了源码中BatchTableEnvUtil类使用了CollectTableSink来做collect的逻辑, > 但是下方代码运用了源码内部的private方法, 看起来不允许外部调用: > > def collect[T]( > tEnv: TableEnvironment, > table: Table, > sink: CollectTableSink[T], > jobName: Option[String]): Seq[T] = { > val typeSerializer = fromDataTypeToLegacyInfo(sink.getConsumedDataType) > .asInstanceOf[TypeInformation[T]] > .createSerializer(tEnv.asInstanceOf[TableEnvironmentImpl] > .getPlanner.asInstanceOf[PlannerBase].getExecEnv.getConfig) > val id = new AbstractID().toString > sink.init(typeSerializer.asInstanceOf[TypeSerializer[T]], id) > val sinkName = UUID.randomUUID().toString > tEnv.registerTableSink(sinkName, sink) > tEnv.insertInto(table, sinkName) > > val res = tEnv.execute("test") > val accResult: JArrayList[Array[Byte]] = res.getAccumulatorResult(id) > SerializedListAccumulator.deserializeList(accResult, typeSerializer) > } > > > jun su <[hidden email]> 于2020年4月24日周五 下午2:05写道: > > > hi all, > > > > blink模式下没法将table 转为 dataset , 所以如果直接collect了, 请问有类似方法可以获取到 > > 结果用于代码调试么? > > > > -- > > Best, > > Jun Su > > > > > -- > Best, > Jun Su > -- Best, Jingsong Lee |
非常感谢, 我用的flink-1.9.2 , 但是直接将代码copy过来可以用了!
Jingsong Li <[hidden email]> 于2020年4月24日周五 下午3:02写道: > 1.10里面有TableUtils了,里面有collectToList > > > Best, > Jingsong Lee > > On Fri, Apr 24, 2020 at 2:49 PM jun su <[hidden email]> wrote: > > > hi all, > > > > 找到了源码中BatchTableEnvUtil类使用了CollectTableSink来做collect的逻辑, > > 但是下方代码运用了源码内部的private方法, 看起来不允许外部调用: > > > > def collect[T]( > > tEnv: TableEnvironment, > > table: Table, > > sink: CollectTableSink[T], > > jobName: Option[String]): Seq[T] = { > > val typeSerializer = fromDataTypeToLegacyInfo(sink.getConsumedDataType) > > .asInstanceOf[TypeInformation[T]] > > .createSerializer(tEnv.asInstanceOf[TableEnvironmentImpl] > > .getPlanner.asInstanceOf[PlannerBase].getExecEnv.getConfig) > > val id = new AbstractID().toString > > sink.init(typeSerializer.asInstanceOf[TypeSerializer[T]], id) > > val sinkName = UUID.randomUUID().toString > > tEnv.registerTableSink(sinkName, sink) > > tEnv.insertInto(table, sinkName) > > > > val res = tEnv.execute("test") > > val accResult: JArrayList[Array[Byte]] = res.getAccumulatorResult(id) > > SerializedListAccumulator.deserializeList(accResult, typeSerializer) > > } > > > > > > jun su <[hidden email]> 于2020年4月24日周五 下午2:05写道: > > > > > hi all, > > > > > > blink模式下没法将table 转为 dataset , 所以如果直接collect了, 请问有类似方法可以获取到 > > > 结果用于代码调试么? > > > > > > -- > > > Best, > > > Jun Su > > > > > > > > > -- > > Best, > > Jun Su > > > > > -- > Best, Jingsong Lee > -- Best, Jun Su |
Free forum by Nabble | Edit this page |