UDTF函数

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

UDTF函数

liu_mingzhang
这是我写的UDTF函数:
class split2rows_PA01CH extends TableFunction[Row] {

def eval(ReportNo: String, field1: String, field2: String): Unit = {
val field1rows = field1.split(",", -1)
val field2rows = field2.split(",", -1)
for (x <- 0 until field1rows.length) {
val row = new Row(3)
row.setField(0, ReportNo)
row.setField(1, (x + 1).toString)
row.setField(2, field1rows(x))
row.setField(3, field2rows(x))
collect(row)
}
}

override def getResultType: TypeInformation[Row] ={
Types.ROW(Array("ReportNo", "ItemNo", "PA01CD01", "PA01CI01"),Array(Types.STRING, Types.STRING, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]])
}

}



但是运行的时候竟然报错:
Exception in thread "main" java.lang.RuntimeException: The Nothing type cannot have a serializer.
	at org.apache.flink.api.common.typeinfo.NothingTypeInfo.createSerializer(NothingTypeInfo.java:74)
	at org.apache.flink.api.java.typeutils.RowTypeInfo.createSerializer(RowTypeInfo.java:240)
	at org.apache.flink.table.runtime.types.CRowTypeInfo.createSerializer(CRowTypeInfo.scala:57)
	at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:207)
	at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:539)
	at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:166)
	at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generateInternal(StreamGraphGenerator.java:132)
	at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:124)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1538)
	at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:89)
	at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
	at com.aibank.personbank.udtf.test$.main(test.scala:42)
	at com.aibank.personbank.udtf.test.main(test.scala)


我DEBUG发现,在执行createSerializer的时候,输出的ROW没有被构建RowTypeInfo(见附件),难道我写的UDTF函数有什么问题吗, getResultType不就是给Row构建RowTypeInfo的吗
跪求大佬解答,万分感谢!