Queryable State 查询反序列化问题

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Queryable State 查询反序列化问题

chengwenfeng
大家好:
我在测试Querable State功能的时候,发现
语法
        dataStream.keyby(key).process();  这种语法下,简单的状态和复杂的POJO都可以查询
但在
       studentAnswerDataStream.connect(learningStrategyDataStream)
                .keyBy(val->val.getCourseId()+"_"+val.getTaskId()
                        , val->val.getCourseId()+"_"+val.getTaskId())
                .process()  这种语法情况下,简单的状态可以,但复杂的POJO无法反序列化回来




错误:


Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Failed request 9.
 Caused by: java.lang.RuntimeException: Error while processing request with ID 9. Caused by: java.io.IOException: Unable to deserialize key and namespace. This indicates a mismatch in the key/namespace serializers used by the KvState instance and this access.
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:109)
at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:101)
at org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unexpected magic number 48.
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:99)
... 10 more


at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at cn.unipus.flink.GetQueryableState2.main(GetQueryableState2.java:41)
Caused by: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Failed request 9.
 Caused by: java.lang.RuntimeException: Error while processing request with ID 9. Caused by: java.io.IOException: Unable to deserialize key and namespace. This indicates a mismatch in the key/namespace serializers used by the KvState instance and this access.
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:109)
at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:101)
at org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unexpected magic number 48.
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:99)
... 10 more


at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)




使用版本1.9.1


代码如下


状态代码
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;


public class QueryableStateDemo2 {

 public static void main(String[] args) throws Exception {

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);

 DataStream<StudentAnswer> studentAnswerDataStream = env.addSource(new SourceFunction<StudentAnswer>() {
 @Override
 public void run(SourceContext<StudentAnswer> ctx) throws Exception {
 StudentAnswer studentAnswer = new StudentAnswer();
 studentAnswer.setCourseId(100L);
 studentAnswer.setTaskId("ug01");
 studentAnswer.setUserId(1L);
 studentAnswer.setAnswer("答案");
 ctx.collect(studentAnswer);
 while (true) {
 Thread.sleep(1000 * 60);
 }
 }

 @Override
 public void cancel() {

 }
 });

 DataStream<LearningStrategy> learningStrategyDataStream = env.addSource(new SourceFunction<LearningStrategy>() {
 @Override
 public void run(SourceContext<LearningStrategy> ctx) throws Exception {
 LearningStrategy learningStrategy = new LearningStrategy();
 learningStrategy.setCourseId(100L);
 learningStrategy.setTaskId("ug01");
 ctx.collect(learningStrategy);
 while (true) {
 Thread.sleep(1000 * 60);
 }
 }

 @Override
 public void cancel() {

 }
 });

 studentAnswerDataStream.connect(learningStrategyDataStream)
 .keyBy(val->val.getCourseId()+"_"+val.getTaskId()
 , val->val.getCourseId()+"_"+val.getTaskId())
 .process(new KeyedCoProcessFunction<String, StudentAnswer, LearningStrategy, String>() {


 private transient ValueState<StudentAnswer> leftBuffer;
 private transient ValueState<LearningStrategy> rightBuffer;


 @Override
 public void open(Configuration conf) {
 ValueStateDescriptor<StudentAnswer> leftBufferDescriptor = new ValueStateDescriptor<>(
 "left_buffer", TypeInformation.of(StudentAnswer.class));
 leftBufferDescriptor.setQueryable("left_buffer_query");

 ValueStateDescriptor<LearningStrategy> rightBufferDescriptor = new ValueStateDescriptor<>(
 "right_buffer", TypeInformation.of(LearningStrategy.class));
 rightBufferDescriptor.setQueryable("right_buffer_query");

 leftBuffer = getRuntimeContext().getState(leftBufferDescriptor);
 rightBuffer = getRuntimeContext().getState(rightBufferDescriptor);
 }

 @Override
 public void processElement1(StudentAnswer value, Context context, Collector<String> collector) throws Exception {
 System.out.println("processElement1:" + value);
 leftBuffer.update(value);
 String key = context.getCurrentKey();
 collector.collect(key);
 }

 @Override
 public void processElement2(LearningStrategy value, Context context, Collector<String> collector) throws Exception {
 System.out.println("processElement2:" + value);
 rightBuffer.update(value);
 String key = context.getCurrentKey();
 collector.collect(key);
 }
 }).print("结果");


 env.execute("State");
 }


}

客户端代码:


import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.queryablestate.client.QueryableStateClient;

import java.util.concurrent.CompletableFuture;

public class GetQueryableState2 {

 public static void main(String[] args) throws Exception
 {


 QueryableStateClient client = new QueryableStateClient("localhost", 9069);

 ExecutionConfig executionConfig = new ExecutionConfig();
 client.setExecutionConfig(executionConfig);


 ValueStateDescriptor<StudentAnswer> leftBufferDescriptor = new ValueStateDescriptor<>(
 "left_buffer", TypeInformation.of(StudentAnswer.class));


 String key = "100_ug01";

 JobID jobId = JobID.fromHexString("0b4ed273b44f0cff6065705c6e4ea17f");

 CompletableFuture<ValueState<StudentAnswer>> resultFuture =
 client.getKvState(jobId, "left_buffer_query", key
 , BasicTypeInfo.STRING_TYPE_INFO, leftBufferDescriptor);


 ValueState<StudentAnswer> leftBuffer = resultFuture.get();
 System.out.println("结果:"+leftBuffer.value());

 // now handle the returned value
// resultFuture.thenAccept(response ->
// {
// try {
// Tuple2<String, Long> res = response.value();
//
// System.out.println("Queried sum value: " + res);
//
// } catch (Exception e)
// {
// e.printStackTrace();
// }
// System.out.println("Exiting future ...");
// });
 Thread.sleep(1000L*10);
 }

}




Domain如下
public class BaseDomain implements Serializable {

 protected String bn = "2019";
 protected String version = "1.0";

 public String getBn() {
 return bn;
 }

 public void setBn(String bn) {
 this.bn = bn;
 }

 public String getVersion() {
 return version;
 }

 public void setVersion(String version) {
 this.version = version;
 }
}


public class LearningStrategy extends BaseDomain {
 private Long courseId;
 private String taskId;
 private Byte pushOrder = 1;

 public Long getCourseId() {
 return courseId;
 }

 public void setCourseId(Long courseId) {
 this.courseId = courseId;
 }

 public String getTaskId() {
 return taskId;
 }

 public void setTaskId(String taskId) {
 this.taskId = taskId;
 }

 public Byte getPushOrder() {
 return pushOrder;
 }

 public void setPushOrder(Byte pushOrder) {
 this.pushOrder = pushOrder;
 }

 @Override
 public String toString() {
 return "LearningStrategy{" +
 "bn='" + bn + '\'' +
 ", version='" + version + '\'' +
 ", courseId=" + courseId +
 ", taskId='" + taskId + '\'' +
 ", pushOrder=" + pushOrder +
 '}';
 }
}


public class StudentAnswer extends BaseDomain{
 private Long courseId;
 private String taskId;
 private Long userId;
 private String answer;

 public Long getCourseId() {
 return courseId;
 }

 public void setCourseId(Long courseId) {
 this.courseId = courseId;
 }

 public String getTaskId() {
 return taskId;
 }

 public void setTaskId(String taskId) {
 this.taskId = taskId;
 }

 public Long getUserId() {
 return userId;
 }

 public void setUserId(Long userId) {
 this.userId = userId;
 }

 public String getAnswer() {
 return answer;
 }

 public void setAnswer(String answer) {
 this.answer = answer;
 }

 @Override
 public String toString() {
 return "StudentAnswer{" +
 "bn='" + bn + '\'' +
 ", version='" + version + '\'' +
 ", courseId=" + courseId +
 ", taskId='" + taskId + '\'' +
 ", userId=" + userId +
 ", answer='" + answer + '\'' +
 '}';
 }
}
Reply | Threaded
Open this post in threaded view
|

Re: Queryable State 查询反序列化问题

Congxian Qiu
Hi

从错误栈来看,应该是 serializer 不一致导致的,可以再检查下相应的 key/namespace serialzier

Best,
Congxian


chengwenfeng <[hidden email]> 于2019年11月12日周二 下午2:47写道:

> 大家好:
> 我在测试Querable State功能的时候,发现
> 语法
>         dataStream.keyby(key).process();  这种语法下,简单的状态和复杂的POJO都可以查询
> 但在
>        studentAnswerDataStream.connect(learningStrategyDataStream)
>                 .keyBy(val->val.getCourseId()+"_"+val.getTaskId()
>                         , val->val.getCourseId()+"_"+val.getTaskId())
>                 .process()  这种语法情况下,简单的状态可以,但复杂的POJO无法反序列化回来
>
>
>
>
> 错误:
>
>
> Exception in thread "main" java.util.concurrent.ExecutionException:
> java.lang.RuntimeException: Failed request 0.
>  Caused by: java.lang.RuntimeException: Failed request 9.
>  Caused by: java.lang.RuntimeException: Error while processing request
> with ID 9. Caused by: java.io.IOException: Unable to deserialize key and
> namespace. This indicates a mismatch in the key/namespace serializers used
> by the KvState instance and this access.
> at
> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:109)
> at
> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:101)
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107)
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84)
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
> at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Unexpected magic number 48.
> at
> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:99)
> ... 10 more
>
>
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95)
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
> at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
> at
> java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
> at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at cn.unipus.flink.GetQueryableState2.main(GetQueryableState2.java:41)
> Caused by: java.lang.RuntimeException: Failed request 0.
>  Caused by: java.lang.RuntimeException: Failed request 9.
>  Caused by: java.lang.RuntimeException: Error while processing request
> with ID 9. Caused by: java.io.IOException: Unable to deserialize key and
> namespace. This indicates a mismatch in the key/namespace serializers used
> by the KvState instance and this access.
> at
> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:109)
> at
> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:101)
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107)
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84)
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
> at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Unexpected magic number 48.
> at
> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:99)
> ... 10 more
>
>
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95)
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
> at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
> at
> java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
> at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> 使用版本1.9.1
>
>
> 代码如下
>
>
> 状态代码
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.apache.flink.util.Collector;
>
>
> public class QueryableStateDemo2 {
>
>  public static void main(String[] args) throws Exception {
>
>  StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>  env.setParallelism(1);
>
>  DataStream<StudentAnswer> studentAnswerDataStream = env.addSource(new
> SourceFunction<StudentAnswer>() {
>  @Override
>  public void run(SourceContext<StudentAnswer> ctx) throws Exception {
>  StudentAnswer studentAnswer = new StudentAnswer();
>  studentAnswer.setCourseId(100L);
>  studentAnswer.setTaskId("ug01");
>  studentAnswer.setUserId(1L);
>  studentAnswer.setAnswer("答案");
>  ctx.collect(studentAnswer);
>  while (true) {
>  Thread.sleep(1000 * 60);
>  }
>  }
>
>  @Override
>  public void cancel() {
>
>  }
>  });
>
>  DataStream<LearningStrategy> learningStrategyDataStream =
> env.addSource(new SourceFunction<LearningStrategy>() {
>  @Override
>  public void run(SourceContext<LearningStrategy> ctx) throws Exception {
>  LearningStrategy learningStrategy = new LearningStrategy();
>  learningStrategy.setCourseId(100L);
>  learningStrategy.setTaskId("ug01");
>  ctx.collect(learningStrategy);
>  while (true) {
>  Thread.sleep(1000 * 60);
>  }
>  }
>
>  @Override
>  public void cancel() {
>
>  }
>  });
>
>  studentAnswerDataStream.connect(learningStrategyDataStream)
>  .keyBy(val->val.getCourseId()+"_"+val.getTaskId()
>  , val->val.getCourseId()+"_"+val.getTaskId())
>  .process(new KeyedCoProcessFunction<String, StudentAnswer,
> LearningStrategy, String>() {
>
>
>  private transient ValueState<StudentAnswer> leftBuffer;
>  private transient ValueState<LearningStrategy> rightBuffer;
>
>
>  @Override
>  public void open(Configuration conf) {
>  ValueStateDescriptor<StudentAnswer> leftBufferDescriptor = new
> ValueStateDescriptor<>(
>  "left_buffer", TypeInformation.of(StudentAnswer.class));
>  leftBufferDescriptor.setQueryable("left_buffer_query");
>
>  ValueStateDescriptor<LearningStrategy> rightBufferDescriptor = new
> ValueStateDescriptor<>(
>  "right_buffer", TypeInformation.of(LearningStrategy.class));
>  rightBufferDescriptor.setQueryable("right_buffer_query");
>
>  leftBuffer = getRuntimeContext().getState(leftBufferDescriptor);
>  rightBuffer = getRuntimeContext().getState(rightBufferDescriptor);
>  }
>
>  @Override
>  public void processElement1(StudentAnswer value, Context context,
> Collector<String> collector) throws Exception {
>  System.out.println("processElement1:" + value);
>  leftBuffer.update(value);
>  String key = context.getCurrentKey();
>  collector.collect(key);
>  }
>
>  @Override
>  public void processElement2(LearningStrategy value, Context context,
> Collector<String> collector) throws Exception {
>  System.out.println("processElement2:" + value);
>  rightBuffer.update(value);
>  String key = context.getCurrentKey();
>  collector.collect(key);
>  }
>  }).print("结果");
>
>
>  env.execute("State");
>  }
>
>
> }
>
> 客户端代码:
>
>
> import org.apache.flink.api.common.ExecutionConfig;
> import org.apache.flink.api.common.JobID;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.queryablestate.client.QueryableStateClient;
>
> import java.util.concurrent.CompletableFuture;
>
> public class GetQueryableState2 {
>
>  public static void main(String[] args) throws Exception
>  {
>
>
>  QueryableStateClient client = new QueryableStateClient("localhost", 9069);
>
>  ExecutionConfig executionConfig = new ExecutionConfig();
>  client.setExecutionConfig(executionConfig);
>
>
>  ValueStateDescriptor<StudentAnswer> leftBufferDescriptor = new
> ValueStateDescriptor<>(
>  "left_buffer", TypeInformation.of(StudentAnswer.class));
>
>
>  String key = "100_ug01";
>
>  JobID jobId = JobID.fromHexString("0b4ed273b44f0cff6065705c6e4ea17f");
>
>  CompletableFuture<ValueState<StudentAnswer>> resultFuture =
>  client.getKvState(jobId, "left_buffer_query", key
>  , BasicTypeInfo.STRING_TYPE_INFO, leftBufferDescriptor);
>
>
>  ValueState<StudentAnswer> leftBuffer = resultFuture.get();
>  System.out.println("结果:"+leftBuffer.value());
>
>  // now handle the returned value
> // resultFuture.thenAccept(response ->
> // {
> // try {
> // Tuple2<String, Long> res = response.value();
> //
> // System.out.println("Queried sum value: " + res);
> //
> // } catch (Exception e)
> // {
> // e.printStackTrace();
> // }
> // System.out.println("Exiting future ...");
> // });
>  Thread.sleep(1000L*10);
>  }
>
> }
>
>
>
>
> Domain如下
> public class BaseDomain implements Serializable {
>
>  protected String bn = "2019";
>  protected String version = "1.0";
>
>  public String getBn() {
>  return bn;
>  }
>
>  public void setBn(String bn) {
>  this.bn = bn;
>  }
>
>  public String getVersion() {
>  return version;
>  }
>
>  public void setVersion(String version) {
>  this.version = version;
>  }
> }
>
>
> public class LearningStrategy extends BaseDomain {
>  private Long courseId;
>  private String taskId;
>  private Byte pushOrder = 1;
>
>  public Long getCourseId() {
>  return courseId;
>  }
>
>  public void setCourseId(Long courseId) {
>  this.courseId = courseId;
>  }
>
>  public String getTaskId() {
>  return taskId;
>  }
>
>  public void setTaskId(String taskId) {
>  this.taskId = taskId;
>  }
>
>  public Byte getPushOrder() {
>  return pushOrder;
>  }
>
>  public void setPushOrder(Byte pushOrder) {
>  this.pushOrder = pushOrder;
>  }
>
>  @Override
>  public String toString() {
>  return "LearningStrategy{" +
>  "bn='" + bn + '\'' +
>  ", version='" + version + '\'' +
>  ", courseId=" + courseId +
>  ", taskId='" + taskId + '\'' +
>  ", pushOrder=" + pushOrder +
>  '}';
>  }
> }
>
>
> public class StudentAnswer extends BaseDomain{
>  private Long courseId;
>  private String taskId;
>  private Long userId;
>  private String answer;
>
>  public Long getCourseId() {
>  return courseId;
>  }
>
>  public void setCourseId(Long courseId) {
>  this.courseId = courseId;
>  }
>
>  public String getTaskId() {
>  return taskId;
>  }
>
>  public void setTaskId(String taskId) {
>  this.taskId = taskId;
>  }
>
>  public Long getUserId() {
>  return userId;
>  }
>
>  public void setUserId(Long userId) {
>  this.userId = userId;
>  }
>
>  public String getAnswer() {
>  return answer;
>  }
>
>  public void setAnswer(String answer) {
>  this.answer = answer;
>  }
>
>  @Override
>  public String toString() {
>  return "StudentAnswer{" +
>  "bn='" + bn + '\'' +
>  ", version='" + version + '\'' +
>  ", courseId=" + courseId +
>  ", taskId='" + taskId + '\'' +
>  ", userId=" + userId +
>  ", answer='" + answer + '\'' +
>  '}';
>  }
> }