Hi,
postgres字段包含大小写。 postgres_sink = """ CREATE TABLE alarm_history_data ( `recordId` STRING, `rowtime` TIMESTAMP(3), `action` STRING, `originalState` STRING, `newState` STRING, `originalCause` STRING, `newCause` STRING, `ser_name` STRING, `enb` STRING, `eventTime` STRING, `ceasedTime` STRING, `duration` STRING, `acked` STRING, `pmdId` STRING, `pmdTime` STRING, PRIMARY KEY (`recordId`) NOT ENFORCED ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:postgresql://127.0.0.1:5432/postgres', 'connector.table' = 'alarm_history_data', 'connector.driver' = 'org.postgresql.Driver', 'connector.username' = 'postgres', 'connector.password' = 'my_password', 'connector.write.flush.max-rows' = '1' ) """ st_env.scan("source").group_by("recordId").select( "recordId," "last_tvalue(actionTime) as rowtime, last_value(action)," "last_value(originalState) as originalState, last_value(newState)," "last_value(originalCause), last_value(newCause)," "last_value(ser_name), last_value(enb), last_value(eventTime)," "last_value(ceasedTime), last_value(duration), last_value(acked)," "last_value(pmdId), last_value(pmdTime)" ).insert_into("alarm_history_data") sink出错,报错是: Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO alarm_history_data(recordId, rowtime, action, originalState, newState, originalCause, newCause, ser_name, enb, eventTime, ceasedTime, duration, acked, pmdId, pmdTime) VALUES ('47357607', '2020-06-03 17:37:44+08', 'Insert', '', 'cleared', '', 'crash', 'Oyama_ENM_MS', '789198-houshakuzi-RBS6302', '2020-06-03T17:24:57', '2020-06-03T17:29:50', '293.0', 'No', '0x80000002', '2020-06-03T17:22:46') ON CONFLICT (recordId) DO UPDATE SET recordId=EXCLUDED.recordId, rowtime=EXCLUDED.rowtime, action=EXCLUDED.action, originalState=EXCLUDED.originalState, newState=EXCLUDED.newState, originalCause=EXCLUDED.originalCause, newCause=EXCLUDED.newCause, ser_name=EXCLUDED.ser_name, enb=EXCLUDED.enb, eventTime=EXCLUDED.eventTime, ceasedTime=EXCLUDED.ceasedTime, duration=EXCLUDED.duration, acked=EXCLUDED.acked, pmdId=EXCLUDED.pmdId, pmdTime=EXCLUDED.pmdTime was aborted: ERROR: column "recordid" of relation "alarm_history_data" does not exist 请问要怎么解决?要怎样才能在最终的sql语句里面加个引号把字段包起来? -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Administrator
|
Hi,你的 alarm_history_data 表的 postgres 里面的 schema是 public 么?
如果不是的话,你需要显式地把 schema 名字声明到表名上,例如 schema 为 sch1,那么 FlinkSQL 里需要定义成 CREATE TABLE `sch1.alarm_history_data` ( ... ) with (...); select * from `sch1.alarm_history_data`; Best, Jark On Tue, 4 Aug 2020 at 14:58, lgs <[hidden email]> wrote: > Hi, > postgres字段包含大小写。 > postgres_sink = """ > CREATE TABLE alarm_history_data ( > `recordId` STRING, > `rowtime` TIMESTAMP(3), > `action` STRING, > `originalState` STRING, > `newState` STRING, > `originalCause` STRING, > `newCause` STRING, > `ser_name` STRING, > `enb` STRING, > `eventTime` STRING, > `ceasedTime` STRING, > `duration` STRING, > `acked` STRING, > `pmdId` STRING, > `pmdTime` STRING, > PRIMARY KEY (`recordId`) NOT ENFORCED > ) WITH ( > 'connector.type' = 'jdbc', > 'connector.url' = 'jdbc:postgresql://127.0.0.1:5432/postgres', > 'connector.table' = 'alarm_history_data', > 'connector.driver' = 'org.postgresql.Driver', > 'connector.username' = 'postgres', > 'connector.password' = 'my_password', > 'connector.write.flush.max-rows' = '1' > ) > """ > > st_env.scan("source").group_by("recordId").select( > "recordId," > "last_tvalue(actionTime) as rowtime, last_value(action)," > "last_value(originalState) as originalState, last_value(newState)," > "last_value(originalCause), last_value(newCause)," > "last_value(ser_name), last_value(enb), last_value(eventTime)," > "last_value(ceasedTime), last_value(duration), last_value(acked)," > "last_value(pmdId), last_value(pmdTime)" > ).insert_into("alarm_history_data") > > sink出错,报错是: > Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO > alarm_history_data(recordId, rowtime, action, originalState, newState, > originalCause, newCause, ser_name, enb, eventTime, ceasedTime, duration, > acked, pmdId, pmdTime) VALUES ('47357607', '2020-06-03 17:37:44+08', > 'Insert', '', 'cleared', '', 'crash', 'Oyama_ENM_MS', > '789198-houshakuzi-RBS6302', '2020-06-03T17:24:57', '2020-06-03T17:29:50', > '293.0', 'No', '0x80000002', '2020-06-03T17:22:46') ON CONFLICT (recordId) > DO UPDATE SET recordId=EXCLUDED.recordId, rowtime=EXCLUDED.rowtime, > action=EXCLUDED.action, originalState=EXCLUDED.originalState, > newState=EXCLUDED.newState, originalCause=EXCLUDED.originalCause, > newCause=EXCLUDED.newCause, ser_name=EXCLUDED.ser_name, enb=EXCLUDED.enb, > eventTime=EXCLUDED.eventTime, ceasedTime=EXCLUDED.ceasedTime, > duration=EXCLUDED.duration, acked=EXCLUDED.acked, pmdId=EXCLUDED.pmdId, > pmdTime=EXCLUDED.pmdTime was aborted: ERROR: column "recordid" of relation > "alarm_history_data" does not exist > > 请问要怎么解决?要怎样才能在最终的sql语句里面加个引号把字段包起来? > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
schema是public
问题在这里:ERROR: column "recordid" of relation "alarm_history_data" does not exist 数据库表里面是“recordId”,这里的提示变成了“recordid” -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Administrator
|
我觉得原因应该是 postgres 中在建表的时候,默认会把字段名转成小写的,所以你在 Flink SQL 这边也要声明成小写的。
你可以在postgres 中看一下表的字段信息。 Best, Jark On Fri, 7 Aug 2020 at 13:48, lgs <[hidden email]> wrote: > schema是public > 问题在这里:ERROR: column "recordid" of relation "alarm_history_data" does not > exist > > 数据库表里面是“recordId”,这里的提示变成了“recordid” > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
Free forum by Nabble | Edit this page |