使用DataStream API + HBaseRowInputFormat读取HBase时表时,Row arity of from (2) does not match this serializers field length (1)

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

使用DataStream API + HBaseRowInputFormat读取HBase时表时,Row arity of from (2) does not match this serializers field length (1)

automths
Hi:
您好,我在使用DataStream API 读取HBase表时,使用了HBaseRowInputFormat,并根据HBaseTableSchema了schema,代码如下:


val env = StreamExecutionEnvironment.getExecutionEnvironment
val hbaseTableSchema = TableSchema.builder()
      .add(TableColumn.of("id", DataTypes.STRING()))
      .add(TableColumn.of("f1", DataTypes.ROW(DataTypes.FIELD("value", DataTypes.STRING()))))
      .build()
val schema = HBaseTableSchema.fromTableSchema(hbaseTableSchema)


val ds: DataStream[Row] = env.createInput(new HBaseRowInputFormat(
      hbaseConfig(),
      tabelName,
      schema
    ))
ds.print()
env.execute(this.getClass.getSimpleName)
运行时报了如下错误:

 java.lang.RuntimeException: Row arity of from (2) does not match this serializers field length (1).
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:113)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:58)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)




找到了原因是HBaseRowInputFormat源码中:
@Override
public TypeInformation<Row> getProducedType() {
// split the fieldNames
String[] famNames = schema.getFamilyNames();
TypeInformation<?>[] typeInfos = new TypeInformation[famNames.length];
int i = 0;
for (String family : famNames) {
typeInfos[i] = new RowTypeInfo(
schema.getQualifierTypes(family),
schema.getQualifierNames(family));
i++;
}
return new RowTypeInfo(typeInfos, famNames);
}
此处在构建TypeInformation时,没有加入rowkey的类型


所以这是一个bug吗?




祝好!
automths



Reply | Threaded
Open this post in threaded view
|

Re: 使用DataStream API + HBaseRowInputFormat读取HBase时表时,Row arity of from (2) does not match this serializers field length (1)

Jessica J.Wang
可以参照一下 HBaseTableSource 里面的实现方法

HBaseTableSchema hbaseSchema = new HBaseTableSchema();
hbaseSchema.addColumn(xxx)
hbaseSchema.setRowKey(xxx);


execEnv.createInput(new HBaseRowInputFormat(conf, tableName, hbaseSchema),
getReturnType())
                        .name(explainSource());



--
Sent from: http://apache-flink.147419.n8.nabble.com/