大家好:
我在测试Querable State功能的时候,发现 语法 dataStream.keyby(key).process(); 这种语法下,简单的状态和复杂的POJO都可以查询 但在 studentAnswerDataStream.connect(learningStrategyDataStream) .keyBy(val->val.getCourseId()+"_"+val.getTaskId() , val->val.getCourseId()+"_"+val.getTaskId()) .process() 这种语法情况下,简单的状态可以,但复杂的POJO无法反序列化回来 错误: Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed request 0. Caused by: java.lang.RuntimeException: Failed request 9. Caused by: java.lang.RuntimeException: Error while processing request with ID 9. Caused by: java.io.IOException: Unable to deserialize key and namespace. This indicates a mismatch in the key/namespace serializers used by the KvState instance and this access. at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:109) at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:101) at org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Unexpected magic number 48. at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:99) ... 10 more at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778) at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at cn.unipus.flink.GetQueryableState2.main(GetQueryableState2.java:41) Caused by: java.lang.RuntimeException: Failed request 0. Caused by: java.lang.RuntimeException: Failed request 9. Caused by: java.lang.RuntimeException: Error while processing request with ID 9. Caused by: java.io.IOException: Unable to deserialize key and namespace. This indicates a mismatch in the key/namespace serializers used by the KvState instance and this access. at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:109) at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:101) at org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Unexpected magic number 48. at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:99) ... 10 more at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778) at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 使用版本1.9.1 代码如下 状态代码 import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Collector; public class QueryableStateDemo2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<StudentAnswer> studentAnswerDataStream = env.addSource(new SourceFunction<StudentAnswer>() { @Override public void run(SourceContext<StudentAnswer> ctx) throws Exception { StudentAnswer studentAnswer = new StudentAnswer(); studentAnswer.setCourseId(100L); studentAnswer.setTaskId("ug01"); studentAnswer.setUserId(1L); studentAnswer.setAnswer("答案"); ctx.collect(studentAnswer); while (true) { Thread.sleep(1000 * 60); } } @Override public void cancel() { } }); DataStream<LearningStrategy> learningStrategyDataStream = env.addSource(new SourceFunction<LearningStrategy>() { @Override public void run(SourceContext<LearningStrategy> ctx) throws Exception { LearningStrategy learningStrategy = new LearningStrategy(); learningStrategy.setCourseId(100L); learningStrategy.setTaskId("ug01"); ctx.collect(learningStrategy); while (true) { Thread.sleep(1000 * 60); } } @Override public void cancel() { } }); studentAnswerDataStream.connect(learningStrategyDataStream) .keyBy(val->val.getCourseId()+"_"+val.getTaskId() , val->val.getCourseId()+"_"+val.getTaskId()) .process(new KeyedCoProcessFunction<String, StudentAnswer, LearningStrategy, String>() { private transient ValueState<StudentAnswer> leftBuffer; private transient ValueState<LearningStrategy> rightBuffer; @Override public void open(Configuration conf) { ValueStateDescriptor<StudentAnswer> leftBufferDescriptor = new ValueStateDescriptor<>( "left_buffer", TypeInformation.of(StudentAnswer.class)); leftBufferDescriptor.setQueryable("left_buffer_query"); ValueStateDescriptor<LearningStrategy> rightBufferDescriptor = new ValueStateDescriptor<>( "right_buffer", TypeInformation.of(LearningStrategy.class)); rightBufferDescriptor.setQueryable("right_buffer_query"); leftBuffer = getRuntimeContext().getState(leftBufferDescriptor); rightBuffer = getRuntimeContext().getState(rightBufferDescriptor); } @Override public void processElement1(StudentAnswer value, Context context, Collector<String> collector) throws Exception { System.out.println("processElement1:" + value); leftBuffer.update(value); String key = context.getCurrentKey(); collector.collect(key); } @Override public void processElement2(LearningStrategy value, Context context, Collector<String> collector) throws Exception { System.out.println("processElement2:" + value); rightBuffer.update(value); String key = context.getCurrentKey(); collector.collect(key); } }).print("结果"); env.execute("State"); } } 客户端代码: import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.queryablestate.client.QueryableStateClient; import java.util.concurrent.CompletableFuture; public class GetQueryableState2 { public static void main(String[] args) throws Exception { QueryableStateClient client = new QueryableStateClient("localhost", 9069); ExecutionConfig executionConfig = new ExecutionConfig(); client.setExecutionConfig(executionConfig); ValueStateDescriptor<StudentAnswer> leftBufferDescriptor = new ValueStateDescriptor<>( "left_buffer", TypeInformation.of(StudentAnswer.class)); String key = "100_ug01"; JobID jobId = JobID.fromHexString("0b4ed273b44f0cff6065705c6e4ea17f"); CompletableFuture<ValueState<StudentAnswer>> resultFuture = client.getKvState(jobId, "left_buffer_query", key , BasicTypeInfo.STRING_TYPE_INFO, leftBufferDescriptor); ValueState<StudentAnswer> leftBuffer = resultFuture.get(); System.out.println("结果:"+leftBuffer.value()); // now handle the returned value // resultFuture.thenAccept(response -> // { // try { // Tuple2<String, Long> res = response.value(); // // System.out.println("Queried sum value: " + res); // // } catch (Exception e) // { // e.printStackTrace(); // } // System.out.println("Exiting future ..."); // }); Thread.sleep(1000L*10); } } Domain如下 public class BaseDomain implements Serializable { protected String bn = "2019"; protected String version = "1.0"; public String getBn() { return bn; } public void setBn(String bn) { this.bn = bn; } public String getVersion() { return version; } public void setVersion(String version) { this.version = version; } } public class LearningStrategy extends BaseDomain { private Long courseId; private String taskId; private Byte pushOrder = 1; public Long getCourseId() { return courseId; } public void setCourseId(Long courseId) { this.courseId = courseId; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public Byte getPushOrder() { return pushOrder; } public void setPushOrder(Byte pushOrder) { this.pushOrder = pushOrder; } @Override public String toString() { return "LearningStrategy{" + "bn='" + bn + '\'' + ", version='" + version + '\'' + ", courseId=" + courseId + ", taskId='" + taskId + '\'' + ", pushOrder=" + pushOrder + '}'; } } public class StudentAnswer extends BaseDomain{ private Long courseId; private String taskId; private Long userId; private String answer; public Long getCourseId() { return courseId; } public void setCourseId(Long courseId) { this.courseId = courseId; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public Long getUserId() { return userId; } public void setUserId(Long userId) { this.userId = userId; } public String getAnswer() { return answer; } public void setAnswer(String answer) { this.answer = answer; } @Override public String toString() { return "StudentAnswer{" + "bn='" + bn + '\'' + ", version='" + version + '\'' + ", courseId=" + courseId + ", taskId='" + taskId + '\'' + ", userId=" + userId + ", answer='" + answer + '\'' + '}'; } } |
Hi
从错误栈来看,应该是 serializer 不一致导致的,可以再检查下相应的 key/namespace serialzier Best, Congxian chengwenfeng <[hidden email]> 于2019年11月12日周二 下午2:47写道: > 大家好: > 我在测试Querable State功能的时候,发现 > 语法 > dataStream.keyby(key).process(); 这种语法下,简单的状态和复杂的POJO都可以查询 > 但在 > studentAnswerDataStream.connect(learningStrategyDataStream) > .keyBy(val->val.getCourseId()+"_"+val.getTaskId() > , val->val.getCourseId()+"_"+val.getTaskId()) > .process() 这种语法情况下,简单的状态可以,但复杂的POJO无法反序列化回来 > > > > > 错误: > > > Exception in thread "main" java.util.concurrent.ExecutionException: > java.lang.RuntimeException: Failed request 0. > Caused by: java.lang.RuntimeException: Failed request 9. > Caused by: java.lang.RuntimeException: Error while processing request > with ID 9. Caused by: java.io.IOException: Unable to deserialize key and > namespace. This indicates a mismatch in the key/namespace serializers used > by the KvState instance and this access. > at > org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:109) > at > org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:101) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: Unexpected magic number 48. > at > org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:99) > ... 10 more > > > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778) > at > java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at cn.unipus.flink.GetQueryableState2.main(GetQueryableState2.java:41) > Caused by: java.lang.RuntimeException: Failed request 0. > Caused by: java.lang.RuntimeException: Failed request 9. > Caused by: java.lang.RuntimeException: Error while processing request > with ID 9. Caused by: java.io.IOException: Unable to deserialize key and > namespace. This indicates a mismatch in the key/namespace serializers used > by the KvState instance and this access. > at > org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:109) > at > org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:101) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: Unexpected magic number 48. > at > org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:99) > ... 10 more > > > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778) > at > java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > > > > 使用版本1.9.1 > > > 代码如下 > > > 状态代码 > import org.apache.flink.api.common.state.ValueState; > import org.apache.flink.api.common.state.ValueStateDescriptor; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.datastream.DataStream; > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; > import org.apache.flink.streaming.api.functions.source.SourceFunction; > import org.apache.flink.util.Collector; > > > public class QueryableStateDemo2 { > > public static void main(String[] args) throws Exception { > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > > DataStream<StudentAnswer> studentAnswerDataStream = env.addSource(new > SourceFunction<StudentAnswer>() { > @Override > public void run(SourceContext<StudentAnswer> ctx) throws Exception { > StudentAnswer studentAnswer = new StudentAnswer(); > studentAnswer.setCourseId(100L); > studentAnswer.setTaskId("ug01"); > studentAnswer.setUserId(1L); > studentAnswer.setAnswer("答案"); > ctx.collect(studentAnswer); > while (true) { > Thread.sleep(1000 * 60); > } > } > > @Override > public void cancel() { > > } > }); > > DataStream<LearningStrategy> learningStrategyDataStream = > env.addSource(new SourceFunction<LearningStrategy>() { > @Override > public void run(SourceContext<LearningStrategy> ctx) throws Exception { > LearningStrategy learningStrategy = new LearningStrategy(); > learningStrategy.setCourseId(100L); > learningStrategy.setTaskId("ug01"); > ctx.collect(learningStrategy); > while (true) { > Thread.sleep(1000 * 60); > } > } > > @Override > public void cancel() { > > } > }); > > studentAnswerDataStream.connect(learningStrategyDataStream) > .keyBy(val->val.getCourseId()+"_"+val.getTaskId() > , val->val.getCourseId()+"_"+val.getTaskId()) > .process(new KeyedCoProcessFunction<String, StudentAnswer, > LearningStrategy, String>() { > > > private transient ValueState<StudentAnswer> leftBuffer; > private transient ValueState<LearningStrategy> rightBuffer; > > > @Override > public void open(Configuration conf) { > ValueStateDescriptor<StudentAnswer> leftBufferDescriptor = new > ValueStateDescriptor<>( > "left_buffer", TypeInformation.of(StudentAnswer.class)); > leftBufferDescriptor.setQueryable("left_buffer_query"); > > ValueStateDescriptor<LearningStrategy> rightBufferDescriptor = new > ValueStateDescriptor<>( > "right_buffer", TypeInformation.of(LearningStrategy.class)); > rightBufferDescriptor.setQueryable("right_buffer_query"); > > leftBuffer = getRuntimeContext().getState(leftBufferDescriptor); > rightBuffer = getRuntimeContext().getState(rightBufferDescriptor); > } > > @Override > public void processElement1(StudentAnswer value, Context context, > Collector<String> collector) throws Exception { > System.out.println("processElement1:" + value); > leftBuffer.update(value); > String key = context.getCurrentKey(); > collector.collect(key); > } > > @Override > public void processElement2(LearningStrategy value, Context context, > Collector<String> collector) throws Exception { > System.out.println("processElement2:" + value); > rightBuffer.update(value); > String key = context.getCurrentKey(); > collector.collect(key); > } > }).print("结果"); > > > env.execute("State"); > } > > > } > > 客户端代码: > > > import org.apache.flink.api.common.ExecutionConfig; > import org.apache.flink.api.common.JobID; > import org.apache.flink.api.common.state.ValueState; > import org.apache.flink.api.common.state.ValueStateDescriptor; > import org.apache.flink.api.common.typeinfo.BasicTypeInfo; > import org.apache.flink.api.common.typeinfo.TypeHint; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.queryablestate.client.QueryableStateClient; > > import java.util.concurrent.CompletableFuture; > > public class GetQueryableState2 { > > public static void main(String[] args) throws Exception > { > > > QueryableStateClient client = new QueryableStateClient("localhost", 9069); > > ExecutionConfig executionConfig = new ExecutionConfig(); > client.setExecutionConfig(executionConfig); > > > ValueStateDescriptor<StudentAnswer> leftBufferDescriptor = new > ValueStateDescriptor<>( > "left_buffer", TypeInformation.of(StudentAnswer.class)); > > > String key = "100_ug01"; > > JobID jobId = JobID.fromHexString("0b4ed273b44f0cff6065705c6e4ea17f"); > > CompletableFuture<ValueState<StudentAnswer>> resultFuture = > client.getKvState(jobId, "left_buffer_query", key > , BasicTypeInfo.STRING_TYPE_INFO, leftBufferDescriptor); > > > ValueState<StudentAnswer> leftBuffer = resultFuture.get(); > System.out.println("结果:"+leftBuffer.value()); > > // now handle the returned value > // resultFuture.thenAccept(response -> > // { > // try { > // Tuple2<String, Long> res = response.value(); > // > // System.out.println("Queried sum value: " + res); > // > // } catch (Exception e) > // { > // e.printStackTrace(); > // } > // System.out.println("Exiting future ..."); > // }); > Thread.sleep(1000L*10); > } > > } > > > > > Domain如下 > public class BaseDomain implements Serializable { > > protected String bn = "2019"; > protected String version = "1.0"; > > public String getBn() { > return bn; > } > > public void setBn(String bn) { > this.bn = bn; > } > > public String getVersion() { > return version; > } > > public void setVersion(String version) { > this.version = version; > } > } > > > public class LearningStrategy extends BaseDomain { > private Long courseId; > private String taskId; > private Byte pushOrder = 1; > > public Long getCourseId() { > return courseId; > } > > public void setCourseId(Long courseId) { > this.courseId = courseId; > } > > public String getTaskId() { > return taskId; > } > > public void setTaskId(String taskId) { > this.taskId = taskId; > } > > public Byte getPushOrder() { > return pushOrder; > } > > public void setPushOrder(Byte pushOrder) { > this.pushOrder = pushOrder; > } > > @Override > public String toString() { > return "LearningStrategy{" + > "bn='" + bn + '\'' + > ", version='" + version + '\'' + > ", courseId=" + courseId + > ", taskId='" + taskId + '\'' + > ", pushOrder=" + pushOrder + > '}'; > } > } > > > public class StudentAnswer extends BaseDomain{ > private Long courseId; > private String taskId; > private Long userId; > private String answer; > > public Long getCourseId() { > return courseId; > } > > public void setCourseId(Long courseId) { > this.courseId = courseId; > } > > public String getTaskId() { > return taskId; > } > > public void setTaskId(String taskId) { > this.taskId = taskId; > } > > public Long getUserId() { > return userId; > } > > public void setUserId(Long userId) { > this.userId = userId; > } > > public String getAnswer() { > return answer; > } > > public void setAnswer(String answer) { > this.answer = answer; > } > > @Override > public String toString() { > return "StudentAnswer{" + > "bn='" + bn + '\'' + > ", version='" + version + '\'' + > ", courseId=" + courseId + > ", taskId='" + taskId + '\'' + > ", userId=" + userId + > ", answer='" + answer + '\'' + > '}'; > } > } |
Free forum by Nabble | Edit this page |