Administrator
|
Hi,
从异常来看,应该是少了如下这一行: outputFormatStatus = JdbcRowDataOutputFormat.dynamicOutputFormatBuilder() .setJdbcOptions(jdbcOptions) .setFieldDataTypes(fieldDataTypes) .setJdbcDmlOptions(dmlOptions) .setJdbcExecutionOptions(JdbcExecutionOptions.builder().build()) .setRowDataTypeInfo(rowDataTypeInfo) // 少了这一行 .build(); 顺便提醒下, `RowDataTypeInfo` 和 JdbcRowDataOutputFormat 都是内部类,不保证跨版本的兼容性(其实,在下个版本,这两个类都被重构了)。 Best, Jark On Tue, 14 Jul 2020 at 17:38, jindy_liu <[hidden email]> wrote: > > 代码,编译没问题,但运行的时候,RichFlatMapFunction在open的时候,JdbcRowDataOutputFormat.open会core,说RuntimeContext为空,如果去掉outputFormatStatus.setRuntimeContext(this.getRuntimeContext()),又会提示没有初始化? > > 麻烦大佬帮看看,什么问题啊,是我哪里用的不对吗? > > > at > > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.NullPointerException > at > > org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.createSimpleRowDataExecutor(JdbcRowDataOutputFormat.java:198) > at > > org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.lambda$new$2d156164$1(JdbcRowDataOutputFormat.java:94) > at > > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.createAndOpenStatementExecutor(JdbcBatchingOutputFormat.java:131) > at > > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:113) > at > > org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.open(JdbcRowDataOutputFormat.java:103) > at > > com.qqmusic.quku.cdcSync.PostgresSinkMapFunction.open(PostgresSinkMapFunction.java:132) > at > > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > > org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:43) > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473) > at > > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.lang.Thread.run(Thread.java:748) > > > > 代码=====> > > public class PostgresSinkMapFunction extends RichFlatMapFunction<String, > String> { > private static String driverClass = "org.postgresql.Driver"; > private static String dbUrl = > "jdbc:postgresql://localhost:5432/ai_audio_lyric_task"; > private static String userNmae = "postgres"; > private static String passWord = "1"; > > // 表status > private static JdbcRowDataOutputFormat outputFormatStatus; > private static String[] fieldNames = new String[] {"id", "name"}; > private static DataType[] fieldDataTypes = new DataType[]{ > DataTypes.INT(), > DataTypes.STRING()}; > > private static RowType rowType = RowType.of( > Arrays.stream(fieldDataTypes) > .map(DataType::getLogicalType) > .toArray(LogicalType[]::new), > fieldNames); > private static RowDataTypeInfo rowDataTypeInfo = > RowDataTypeInfo.of(rowType); > > @Override > public void flatMap(String s, Collector<String> collector) throws > Exception { > GenericRowData row = new GenericRowData(2); > > row.setRowKind(INSERT); > row.setField(0, count); > row.setField(1, "jindy" + Integer.toString(count)); > > outputFormatStatus.writeRecord(row); > > } > > public void open(Configuration parameters) throws Exception { > super.open(parameters); > > JdbcOptions jdbcOptions = JdbcOptions.builder() > .setDriverName(driverClass) > .setDBUrl(dbUrl) > .setTableName("status_mirror") > .setUsername(userNmae) > .setPassword(passWord) > .build(); > > JdbcDmlOptions dmlOptions = JdbcDmlOptions.builder() > .withTableName(jdbcOptions.getTableName()) > .withDialect(jdbcOptions.getDialect()) > .withFieldNames(fieldNames) > .build(); > > outputFormatStatus = > JdbcRowDataOutputFormat.dynamicOutputFormatBuilder() > .setJdbcOptions(jdbcOptions) > .setFieldDataTypes(fieldDataTypes) > .setJdbcDmlOptions(dmlOptions) > > .setJdbcExecutionOptions(JdbcExecutionOptions.builder().build()) > .build(); > > // set context,这里有问题!!!!!!!!!!!!!!!!!! > outputFormatStatus.setRuntimeContext(this.getRuntimeContext()); > outputFormatStatus.open(0, 1); > } > > public void close() throws Exception { > super.close(); > outputFormatStatus.close(); > } > } > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
确实是这行导致的,
如果都重构了,那应该怎么用较好的? 我需要知道每一行对应的是insert, update还是delete事件。 或者问题变一下,对于这种api,一般遵守什么规则,flink的版本兼容性会更好? -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |