sql topN ArrayIndexOutOfBoundsException

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

sql topN ArrayIndexOutOfBoundsException

1101300123
我的flink SQL 的时候使用了topN语法,在程序运行一段时间后出现了如下错误:




[flink-akka.actor.default-dispatcher-8695] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[contact_id, service_no], orderBy=[operate_time DESC], select=[operate_time, contact_id, call_id, sheet_id, code_contact_channel, start_time, service_no, record_file, contact_length, user_code, user_name, customer_name]) -> Calc(select=[contact_id, start_time, contact_length, service_no, call_id, user_code, user_name, customer_name, sheet_id, record_file, code_contact_channel]) (1/1) (52b8519ad9a44832a283c1760f385bf6) switched from RUNNING to FAILED.
java.lang.ArrayIndexOutOfBoundsException: -1
at java.util.ArrayList.elementData(ArrayList.java:422)
at java.util.ArrayList.remove(ArrayList.java:499)
at org.apache.flink.table.runtime.operators.rank.AppendOnlyTopNFunction.processElementWithoutRowNumber(AppendOnlyTopNFunction.java:205)
at org.apache.flink.table.runtime.operators.rank.AppendOnlyTopNFunction.processElement(AppendOnlyTopNFunction.java:120)
at org.apache.flink.table.runtime.operators.rank.AppendOnlyTopNFunction.processElement(AppendOnlyTopNFunction.java:46)
at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)




我查看源码对应的函数发现对list没有进行是否空集判断,我的理解是程序默认list集合一定会有数据所以会做出不判断空而执行remove操作;
private void processElementWithoutRowNumber(BaseRow input, Collector<BaseRow> out) throws Exception {
  // remove retired element
  if (buffer.getCurrentTopNum() > rankEnd) {
   Map.Entry<BaseRow, Collection<BaseRow>> lastEntry = buffer.lastEntry();
   BaseRow lastKey = lastEntry.getKey();
   List<BaseRow> lastList = (List<BaseRow>) lastEntry.getValue();
   // remove last one
   BaseRow lastElement = lastList.remove(lastList.size() - 1);
   if (lastList.isEmpty()) {
    buffer.removeAll(lastKey);
    dataState.remove(lastKey);
   } else {
    dataState.put(lastKey, lastList);
   }
   if (input.equals(lastElement)) {
    return;
   } else {
    // lastElement shouldn't be null
    delete(out, lastElement);
   }
  }
  collect(out, input);
 }
这边是如何保证map集合一定有数据的,不知道这是不是bug,或者说是我的数据有问题;但是我的数据有空也不会影响程序吧!