|
你好:
flink1.10版本,实时程序执行如下查询写入hbase,报错,烦请看下原因
Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.
s"""
|INSERT INTO ${databaseName}.response_time_sink
|SELECT
| rowkey,
| ROW(`day`, `time`, initialize_route_avg_time, update_detour_avg_time, replace_avg_time, deviate_avg_time) AS cf
|FROM
|(
| select CONCAT_WS('_',CAST(`time` AS VARCHAR),distance_range) rowkey,`day`,`time`,
| MAX(CASE req_type WHEN '0' THEN num else 0 END) initialize_route_avg_time,
| MAX(CASE req_type WHEN '1' THEN num else 0 END) update_detour_avg_time,
| MAX(CASE req_type WHEN '2' THEN num else 0 END) replace_avg_time,
| MAX(CASE req_type WHEN '3' THEN num else 0 END) deviate_avg_time
| from
| (SELECT DATE_FORMAT(TIMESTAMPADD(HOUR, 8, TUMBLE_START(proctime, INTERVAL '10' SECOND)), 'yyyy-MM-dd') `day`,
| UNIX_TIMESTAMP(DATE_FORMAT(TIMESTAMPADD(HOUR, 8, TUMBLE_START(proctime, INTERVAL '10' SECOND)), 'yyyy-MM-dd HH:mm:ss')) * 1000 AS `time`,
| req_type,
| (CASE WHEN ResponseRemainingMile<=50 THEN '1'
| WHEN ResponseRemainingMile> 50 AND ResponseRemainingMile<= 250 THEN '2'
| WHEN ResponseRemainingMile> 250 AND ResponseRemainingMile<= 500 THEN '3'
| WHEN ResponseRemainingMile> 500 THEN '4' END) as distance_range,
| CAST(AVG(`value`) AS INT) num
| FROM
| ${databaseName}.metric_stream
| WHERE
| metric = 'http_common_request'
| GROUP BY
| TUMBLE(proctime, INTERVAL '10' SECOND),req_type,(CASE WHEN ResponseRemainingMile<=50 THEN '1'
| WHEN ResponseRemainingMile> 50 AND ResponseRemainingMile<= 250 THEN '2'
| WHEN ResponseRemainingMile> 250 AND ResponseRemainingMile<= 500 THEN '3'
| WHEN ResponseRemainingMile> 500 THEN '4' END))
| group by CONCAT_WS('_',CAST(`time` AS VARCHAR),distance_range),`day`,`time`
|) a
|""".stripMargin 期待你的答复,谢谢!
|