|
hi, dear all :
为什么我通过 flatmap 写 liststate, extends RichFlatMapFunction, 一直报 java.lang.NoClassDefFoundError ??
stream.map(x => (x._4._1, x._4._2))
.keyBy(_._2)
.flatMap(new ReceptionListStateFunction2)
class ReceptionListStateFunction2 extends RichFlatMapFunction[(String, Int), List[String]] {
var myState: ListState[String] = _
override def flatMap(value: (String, Int), out: Collector[List[String]]): Unit = {
if (value._2 == 1) {
myState.add(value._1)
}
val states = myState.get().iterator()
val listBuf: ListBuffer[String] = new ListBuffer[String]()
while (states.hasNext) {
listBuf.append(states.next())
}
out.collect(listBuf.toList)
}
}
错误日志:
Exception in thread "main" java.util.concurrent.ExecutionException: scala.tools.reflect.ToolBoxError: reflective compilation has failed: cannot initialize the compiler due to java.lang.NoClassDefFoundError: Could not initialize class scala.tools.nsc.Properties$
at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)
at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:137)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2348)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2320)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
at org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184)
at org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51)
at org.apache.flink.api.scala.typeutils.TraversableSerializer.<init>(TraversableSerializer.scala:41)
|