比较明显的一个问题是UDF定义有些问题是你的sink表定义了一个STRING类型的字段,但是你SQL里面是SELECT了udf的结果,所以我想你的UDF应该也返回一个STRING,也就是result_type=DataTypes.STRING(),同时确保你udf真的返回的是一个字符串,不是json的Object。
Best, Jincheng WuPangang <[hidden email]> 于2020年3月26日周四 下午5:24写道:
|
感谢大佬回复。
根据邮件里面的提示下我尝试了如下操作: @udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING()) def str_add(str_name): return '1' table_env.register_function("str_add", str_add) table_env.sql_update("insert into flink_sinktable_ad_test_1 \ select \ str_add(topicid) AS topicid \ from \ flink_sourcetable_ad_test \ ") 目的:我的目的是想通过最简单的方式看看udf是否有生效。 结果:结果依赖没有数据流入近来。 其他手段和测试:我通过不使用udf来验证数据流是否正常的。结果正常。 所以能在分析下么?或者我应该如何深入的跟踪下? -------- all code below: from pyflink.datastream import StreamExecutionEnvironment,CheckpointingMode,TimeCharacteristic from pyflink.table import StreamTableEnvironment, EnvironmentSettings,TableSink,TableConfig,DataTypes from pyflink.table.descriptors import Schema,Kafka,Rowtime,ConnectTableDescriptor,CustomConnectorDescriptor,Json from pyflink.common import RestartStrategies from pyflink.table.udf import udf import json env = StreamExecutionEnvironment.get_execution_environment() ##contain设置 env.set_parallelism(12) env.set_restart_strategy(RestartStrategies.fixed_delay_restart(50,6)) ##使用blink api environment_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build() table_env = StreamTableEnvironment.create(env,environment_settings=environment_settings) table_env.sql_update("CREATE TABLE flink_sourcetable_ad_test ( \ host STRING, \ type STRING, \ topicid STRING, \ message STRING, \ proctime as PROCTIME() \ ) WITH ( \ 'connector.type' = 'kafka', \ 'connector.version' = 'universal', \ 'connector.topic' = 'advertise_module', \ 'connector.properties.zookeeper.connect' = 'localhost:2181', \ 'connector.properties.bootstrap.servers' = '172.25.80.134:9092', \ 'connector.properties.group.id' = 'flink_1.10_test_source', \ 'connector.startup-mode' = 'latest-offset', \ 'format.type' = 'json' \ )") table_env.sql_update("CREATE TABLE flink_sinktable_ad_test_1 ( \ topicid STRING \ ) WITH ( \ 'connector.type' = 'kafka', \ 'connector.version' = 'universal', \ 'connector.topic' = 'recommend_user_concern_test', \ 'connector.properties.zookeeper.connect' = 'localhost:2181', \ 'connector.properties.bootstrap.servers' = '172.25.82.77:9092', \ 'connector.properties.group.id' = 'flink_1.10_test_sink', \ 'connector.startup-mode' = 'latest-offset', \ 'connector.properties.retries' = '3', \ 'format.type' = 'json', \ 'connector.properties.update_mode' = 'append' \ )") @udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING()) def str_add(str_name): return '1' table_env.register_function("str_add", str_add) #table_env.register_function("str_add", udf(lambda i: i + '1', DataTypes.STRING(), DataTypes.STRING())) table_env.sql_update("insert into flink_sinktable_ad_test_1 \ select \ str_add(topicid) AS topicid \ from \ flink_sourcetable_ad_test \ ") table_env.execute('flink_1.10_test’) ------ > 在 2020年3月26日,下午5:55,jincheng sun <[hidden email]> 写道: > > 比较明显的一个问题是UDF定义有些问题是你的sink表定义了一个STRING类型的字段,但是你SQL里面是SELECT了udf的结果,所以我想你的UDF应该也返回一个STRING,也就是result_type=DataTypes.STRING(),同时确保你udf真的返回的是一个字符串,不是json的Object。 > > Best, > Jincheng > > > WuPangang <[hidden email] <mailto:[hidden email]>> 于2020年3月26日周四 下午5:24写道: > Data as below: > {"host":"172.25.69.145","@timestamp":"2020-03-23T09:21:16.315Z","@version":"1","offset":532178261,"logmark":"advertise-api","type":"squirrel","path":"/home/logs/app/action_log/advertise_api/AdStatistic.2020-03-23.log","message":"{\"date_time\":\"2020-03-23 17:21:15\",\"serverTime\":1584955275,\"currentTime\":\"1584955274\",\"distinct_id\":\"734232168\",\"device_id\":\"ad2c15b7cf910cc6\",\"event\":\"view_material\",\"click_pos\":\"ad_view_avatar\",\"material_name\":\"\\u5934\\u50cf\\u66dd\\u5149\",\"phase\":\"prod\",\"source\":\"phone\",\"client_v\":\"2.100\",\"platform\":\"android\",\"ip\":\"39.84.23.81\",\"network\":\"WIFI\",\"idfa\":\"867179032526091\",\"unique_device_id\":\"ad2c15b7cf910cc6\",\"manufacturer\":\"HUAWEI\",\"model\":\"PRA-AL00X\",\"carrier\":\"\",\"local_dns\":\"0.0,0.0\",\"user_id\":\"734232168\",\"is_login\":1,\"url_path\":\"http:\\/\\/down-ddz.734399.com <http://down-ddz.734399.com/>\\/download\\/shuabao\\/shuabao10-awddz-release-hl.apk\",\"country\":\"\",\"province\":\"\",\"city\":\"\",\"setup_source\":\"androiddefault\",\"seller_id\":\"139091\",\"plan_id\":\"380141\",\"plan_name\":\"HL-10-12.27-\\u5927\\u80f8-\\u6e38\\u620f\",\"put_platform_id\":\"25\",\"put_ad_type_id\":\"27\",\"put_position_id\":\"0\",\"put_sell_type_id\":\"0\",\"material_id\":\"378120\",\"show_id\":\"SMALL_VIDEO_6948314\",\"put_start_time\":\"1577808000\",\"plan_params\":\"advertise-380141-378120\",\"download_app_name\":\"\\u7231\\u73a9\\u6597\\u5730\\u4e3b\",\"ad_type\":\"0\",\"put_source\":\"1\",\"is_ad_recommend\":\"\",\"third_video_url\":\"\",\"third_img_url\":\"\",\"ua\":\"JuMei\\/ (PRA-AL00X; Android; Android OS ; 8.0.0; zh) ApacheHttpClient\\/4.0\",\"platform_v\":\"2.100\",\"played_time\":\"\",\"video_time\":\"\",\"status\":1,\"target_link\":\"http:\\/\\/down-ddz.734399.com <http://down-ddz.734399.com/>\\/download\\/shuabao\\/shuabao10-awddz-release-hl.apk\",\"ad_material_title\":\"\\u5de5\\u8d44\\u6ca13W\\uff0c\\u5c31\\u73a9\\u8fd9\\u6597\\u5730\\u4e3b\\uff0c\\u6bcf\\u5929\\u90fd\\u80fd\\u9886\\u7ea2\\u5305~\",\"ad_material_desc\":\"\\u73a9\\u6597\\u5730\\u4e3b\\uff0c\\u7ea2\\u5305\\u79d2\\u4f53\\u73b0\",\"icon_url\":\"http:\\/\\/p12.jmstatic.com <http://p12.jmstatic.com/>\\/adv\\/\\/material\\/20190920\\/jmadv5d849b203750f.png\",\"button_text\":\"\\u7acb\\u5373\\u4e0b\\u8f7d\",\"material_type\":\"video\",\"third_app_id\":\"\",\"third_pos_id\":\"\",\"download_apk\":\"\",\"package_name\":\"\",\"h5_url\":\"\",\"message\":\"\",\"unit_price\":\"4.15\",\"balance_type\":\"3\",\"ecpm\":\"15.189\",\"accumulate_ctr\":\"0.366\",\"pre_ctr\":\"0.366\",\"real_unit_price\":\"0\",\"pre_charge\":\"0\",\"pre_change\":\"0\",\"subsidy_type\":\"\",\"subsidy_ratio\":\"\",\"suppress_type\":\"0\",\"suppress_ratio\":\"0\",\"two_ecpm\":\"15.165\",\"real_ecpm\":\"0\",\"real_unit_price_threshold\":\"0\",\"plan_accumulate_pv\":\"0\",\"plan_accumulate_cpv\":\"0\",\"plan_accumulate_change\":\"0\",\"request_id\":\"ad2c15b7cf910cc6-1584955121.768552967\",\"ad_activity_type\":\"0\",\"one_cost\":\"4.15\",\"bid\":\"0.366\",\"real_spr\":\"0.998419\",\"one_ecpm\":\"15.189\",\"es_one\":\"0,16,\",\"es_two\":\"0\",\"es_three\":\"0\",\"es_four\":\"0\",\"age\":\"1020\",\"sex\":\"0\",\"one_cost_expend\":\"4.15\",\"two_cost_expend\":\"4.153438\",\"reward_source\":\"\",\"provide\":\"\"}","topicid":"advertise_module","project":"advertise-api”} > Problem: > 数据是个嵌套json,并且核心字段message的格式不能直接通过table api json 相关的方法来处理。 > 自己思考的解决思路:通过udf, 使用json.loads来处理。 > 实际使用中遇到的问题: job提交之后,在dashboard上发现bytes received,records recevied 都是0;下游是同步给Kafka,去消费下游kafka也没有数据。 > > > Code as below: > from pyflink.datastream import StreamExecutionEnvironment,CheckpointingMode,TimeCharacteristic > from pyflink.table import StreamTableEnvironment, EnvironmentSettings,TableSink,TableConfig,DataTypes > from pyflink.table.descriptors import Schema,Kafka,Rowtime,ConnectTableDescriptor,CustomConnectorDescriptor,Json > from pyflink.common import RestartStrategies > from pyflink.table.udf import udf > import json > > env = StreamExecutionEnvironment.get_execution_environment() > #env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > ##checkpoint设置 > #env.enable_checkpointing(300000) > #env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode().EXACTLY_ONCE) > #env.get_checkpoint_config().set_min_pause_between_checkpoints(30000) > #env.get_checkpoint_config().set_checkpoint_timeout(60000) > #env.get_checkpoint_config().set_max_concurrent_checkpoints(1) > #env.get_checkpoint_config().set_prefer_checkpoint_for_recovery(False) > ##contain设置 > env.set_parallelism(12) > env.set_restart_strategy(RestartStrategies.fixed_delay_restart(50,6)) > ##使用blink api > environment_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build() > table_env = StreamTableEnvironment.create(env,environment_settings=environment_settings) > > > table_env.sql_update("CREATE TABLE flink_sourcetable_ad_test ( \ > host STRING, \ > type STRING, \ > topicid STRING, \ > message STRING, \ > proctime as PROCTIME() \ > ) WITH ( \ > 'connector.type' = 'kafka', \ > 'connector.version' = 'universal', \ > 'connector.topic' = 'advertise_module', \ > 'connector.properties.zookeeper.connect' = 'localhost:2181', \ > 'connector.properties.bootstrap.servers' = 'localhost:9092', \ > 'connector.properties.group.id <http://connector.properties.group.id/>' = 'flink_1.10_test_source', \ > 'connector.startup-mode' = 'latest-offset', \ > 'format.type' = 'json', \ > 'format.derive-schema' = 'true' \ > )") > > > table_env.sql_update("CREATE TABLE flink_sinktable_ad_test ( \ > message STRING \ > ) WITH ( \ > 'connector.type' = 'kafka', \ > 'connector.version' = 'universal', \ > 'connector.topic' = 'recommend_user_concern_test', \ > 'connector.properties.zookeeper.connect' = 'localhost:2181', \ > 'connector.properties.bootstrap.servers' = 'localhost:9092', \ > 'connector.properties.group.id <http://connector.properties.group.id/>' = 'flink_1.10_test_sink', \ > 'connector.startup-mode' = 'latest-offset', \ > 'format.type' = 'json', \ > 'connector.properties.retries' = '3', \ > 'connector.properties.update_mode' = 'append' \ > )") > > @udf(input_types=DataTypes.STRING(), result_type=DataTypes.ROW()) > def json_split(message): > return json.loads(message.replace('\"', '"').replace('"{"', '{"').replace('}"', '}')) > table_env.register_function("json_split", json_split) > > table_env.sql_update("insert into flink_sinktable_ad_test \ > select \ > json_split(message) AS message\ > from \ > flink_sourcetable_ad_test \ > ") > table_env.execute('flink_1.10_test') > |
ERROR log:
. Job has been submitted with JobID 91ac323d4d5338418883240680192f34 Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/tmp/dee3edf4-08ff-4c2c-87aa-fef573fae884pyflink.zip/pyflink/table/table_environment.py", line 907, in execute File "/tmp/dee3edf4-08ff-4c2c-87aa-fef573fae884py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__ File "/tmp/dee3edf4-08ff-4c2c-87aa-fef573fae884pyflink.zip/pyflink/util/exceptions.py", line 147, in deco File "/tmp/dee3edf4-08ff-4c2c-87aa-fef573fae884py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o23.execute. : java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 91ac323d4d5338418883240680192f34) at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) at org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 91ac323d4d5338418883240680192f34) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110) ... 19 more Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=50, backoffTimeMS=6) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) at sun.reflect.GeneratedMethodAccessor64.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 0 at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:211) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:202) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:185) at org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:179) at org.apache.flink.table.runtime.operators.python.AbstractPythonScalarFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractPythonScalarFunctionOperator.java:193) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:139) at org.apache.flink.table.runtime.operators.python.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:143) at org.apache.flink.table.runtime.operators.python.BaseRowPythonScalarFunctionOperator.open(BaseRowPythonScalarFunctionOperator.java:86) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalStateException: Process died with exit code 0 at org.apache.beam.runners.fnexecution.environment.ProcessManager$RunningProcess.isAliveOrThrow(ProcessManager.java:74) at org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory.createEnvironment(ProcessEnvironmentFactory.java:125) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:178) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:162) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044) ... 20 more > 在 2020年3月27日,上午11:22,WuPangang <[hidden email]> 写道: > > 感谢大佬回复。 > 根据邮件里面的提示下我尝试了如下操作: > > @udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING()) > def str_add(str_name): > return '1' > table_env.register_function("str_add", str_add) > table_env.sql_update("insert into flink_sinktable_ad_test_1 \ > select \ > str_add(topicid) AS topicid \ > from \ > flink_sourcetable_ad_test \ > ") > 目的:我的目的是想通过最简单的方式看看udf是否有生效。 > 结果:结果依赖没有数据流入近来。 > 其他手段和测试:我通过不使用udf来验证数据流是否正常的。结果正常。 > > > 所以能在分析下么?或者我应该如何深入的跟踪下? > > -------- > all code below: > from pyflink.datastream import StreamExecutionEnvironment,CheckpointingMode,TimeCharacteristic > from pyflink.table import StreamTableEnvironment, EnvironmentSettings,TableSink,TableConfig,DataTypes > from pyflink.table.descriptors import Schema,Kafka,Rowtime,ConnectTableDescriptor,CustomConnectorDescriptor,Json > from pyflink.common import RestartStrategies > from pyflink.table.udf import udf > import json > > env = StreamExecutionEnvironment.get_execution_environment() > ##contain设置 > env.set_parallelism(12) > env.set_restart_strategy(RestartStrategies.fixed_delay_restart(50,6)) > ##使用blink api > environment_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build() > table_env = StreamTableEnvironment.create(env,environment_settings=environment_settings) > > table_env.sql_update("CREATE TABLE flink_sourcetable_ad_test ( \ > host STRING, \ > type STRING, \ > topicid STRING, \ > message STRING, \ > proctime as PROCTIME() \ > ) WITH ( \ > 'connector.type' = 'kafka', \ > 'connector.version' = 'universal', \ > 'connector.topic' = 'advertise_module', \ > 'connector.properties.zookeeper.connect' = 'localhost:2181', \ > 'connector.properties.bootstrap.servers' = '172.25.80.134:9092', \ > 'connector.properties.group.id' = 'flink_1.10_test_source', \ > 'connector.startup-mode' = 'latest-offset', \ > 'format.type' = 'json' \ > )") > > table_env.sql_update("CREATE TABLE flink_sinktable_ad_test_1 ( \ > topicid STRING \ > ) WITH ( \ > 'connector.type' = 'kafka', \ > 'connector.version' = 'universal', \ > 'connector.topic' = 'recommend_user_concern_test', \ > 'connector.properties.zookeeper.connect' = 'localhost:2181', \ > 'connector.properties.bootstrap.servers' = '172.25.82.77:9092', \ > 'connector.properties.group.id' = 'flink_1.10_test_sink', \ > 'connector.startup-mode' = 'latest-offset', \ > 'connector.properties.retries' = '3', \ > 'format.type' = 'json', \ > 'connector.properties.update_mode' = 'append' \ > )") > @udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING()) > def str_add(str_name): > return '1' > table_env.register_function("str_add", str_add) > #table_env.register_function("str_add", udf(lambda i: i + '1', DataTypes.STRING(), DataTypes.STRING())) > table_env.sql_update("insert into flink_sinktable_ad_test_1 \ > select \ > str_add(topicid) AS topicid \ > from \ > flink_sourcetable_ad_test \ > ") > table_env.execute('flink_1.10_test’) > > ------ >> 在 2020年3月26日,下午5:55,jincheng sun <[hidden email]> 写道: >> >> 比较明显的一个问题是UDF定义有些问题是你的sink表定义了一个STRING类型的字段,但是你SQL里面是SELECT了udf的结果,所以我想你的UDF应该也返回一个STRING,也就是result_type=DataTypes.STRING(),同时确保你udf真的返回的是一个字符串,不是json的Object。 >> >> Best, >> Jincheng >> >> >> WuPangang <[hidden email] <mailto:[hidden email]>> 于2020年3月26日周四 下午5:24写道: >> Data as below: >> {"host":"172.25.69.145","@timestamp":"2020-03-23T09:21:16.315Z","@version":"1","offset":532178261,"logmark":"advertise-api","type":"squirrel","path":"/home/logs/app/action_log/advertise_api/AdStatistic.2020-03-23.log","message":"{\"date_time\":\"2020-03-23 17:21:15\",\"serverTime\":1584955275,\"currentTime\":\"1584955274\",\"distinct_id\":\"734232168\",\"device_id\":\"ad2c15b7cf910cc6\",\"event\":\"view_material\",\"click_pos\":\"ad_view_avatar\",\"material_name\":\"\\u5934\\u50cf\\u66dd\\u5149\",\"phase\":\"prod\",\"source\":\"phone\",\"client_v\":\"2.100\",\"platform\":\"android\",\"ip\":\"39.84.23.81\",\"network\":\"WIFI\",\"idfa\":\"867179032526091\",\"unique_device_id\":\"ad2c15b7cf910cc6\",\"manufacturer\":\"HUAWEI\",\"model\":\"PRA-AL00X\",\"carrier\":\"\",\"local_dns\":\"0.0,0.0\",\"user_id\":\"734232168\",\"is_login\":1,\"url_path\":\"http:\\/\\/down-ddz.734399.com <http://down-ddz.734399.com/>\\/download\\/shuabao\\/shuabao10-awddz-release-hl.apk\",\"country\":\"\",\"province\":\"\",\"city\":\"\",\"setup_source\":\"androiddefault\",\"seller_id\":\"139091\",\"plan_id\":\"380141\",\"plan_name\":\"HL-10-12.27-\\u5927\\u80f8-\\u6e38\\u620f\",\"put_platform_id\":\"25\",\"put_ad_type_id\":\"27\",\"put_position_id\":\"0\",\"put_sell_type_id\":\"0\",\"material_id\":\"378120\",\"show_id\":\"SMALL_VIDEO_6948314\",\"put_start_time\":\"1577808000\",\"plan_params\":\"advertise-380141-378120\",\"download_app_name\":\"\\u7231\\u73a9\\u6597\\u5730\\u4e3b\",\"ad_type\":\"0\",\"put_source\":\"1\",\"is_ad_recommend\":\"\",\"third_video_url\":\"\",\"third_img_url\":\"\",\"ua\":\"JuMei\\/ (PRA-AL00X; Android; Android OS ; 8.0.0; zh) ApacheHttpClient\\/4.0\",\"platform_v\":\"2.100\",\"played_time\":\"\",\"video_time\":\"\",\"status\":1,\"target_link\":\"http:\\/\\/down-ddz.734399.com <http://down-ddz.734399.com/>\\/download\\/shuabao\\/shuabao10-awddz-release-hl.apk\",\"ad_material_title\":\"\\u5de5\\u8d44\\u6ca13W\\uff0c\\u5c31\\u73a9\\u8fd9\\u6597\\u5730\\u4e3b\\uff0c\\u6bcf\\u5929\\u90fd\\u80fd\\u9886\\u7ea2\\u5305~\",\"ad_material_desc\":\"\\u73a9\\u6597\\u5730\\u4e3b\\uff0c\\u7ea2\\u5305\\u79d2\\u4f53\\u73b0\",\"icon_url\":\"http:\\/\\/p12.jmstatic.com <http://p12.jmstatic.com/>\\/adv\\/\\/material\\/20190920\\/jmadv5d849b203750f.png\",\"button_text\":\"\\u7acb\\u5373\\u4e0b\\u8f7d\",\"material_type\":\"video\",\"third_app_id\":\"\",\"third_pos_id\":\"\",\"download_apk\":\"\",\"package_name\":\"\",\"h5_url\":\"\",\"message\":\"\",\"unit_price\":\"4.15\",\"balance_type\":\"3\",\"ecpm\":\"15.189\",\"accumulate_ctr\":\"0.366\",\"pre_ctr\":\"0.366\",\"real_unit_price\":\"0\",\"pre_charge\":\"0\",\"pre_change\":\"0\",\"subsidy_type\":\"\",\"subsidy_ratio\":\"\",\"suppress_type\":\"0\",\"suppress_ratio\":\"0\",\"two_ecpm\":\"15.165\",\"real_ecpm\":\"0\",\"real_unit_price_threshold\":\"0\",\"plan_accumulate_pv\":\"0\",\"plan_accumulate_cpv\":\"0\",\"plan_accumulate_change\":\"0\",\"request_id\":\"ad2c15b7cf910cc6-1584955121.768552967\",\"ad_activity_type\":\"0\",\"one_cost\":\"4.15\",\"bid\":\"0.366\",\"real_spr\":\"0.998419\",\"one_ecpm\":\"15.189\",\"es_one\":\"0,16,\",\"es_two\":\"0\",\"es_three\":\"0\",\"es_four\":\"0\",\"age\":\"1020\",\"sex\":\"0\",\"one_cost_expend\":\"4.15\",\"two_cost_expend\":\"4.153438\",\"reward_source\":\"\",\"provide\":\"\"}","topicid":"advertise_module","project":"advertise-api”} >> Problem: >> 数据是个嵌套json,并且核心字段message的格式不能直接通过table api json 相关的方法来处理。 >> 自己思考的解决思路:通过udf, 使用json.loads来处理。 >> 实际使用中遇到的问题: job提交之后,在dashboard上发现bytes received,records recevied 都是0;下游是同步给Kafka,去消费下游kafka也没有数据。 >> >> >> Code as below: >> from pyflink.datastream import StreamExecutionEnvironment,CheckpointingMode,TimeCharacteristic >> from pyflink.table import StreamTableEnvironment, EnvironmentSettings,TableSink,TableConfig,DataTypes >> from pyflink.table.descriptors import Schema,Kafka,Rowtime,ConnectTableDescriptor,CustomConnectorDescriptor,Json >> from pyflink.common import RestartStrategies >> from pyflink.table.udf import udf >> import json >> >> env = StreamExecutionEnvironment.get_execution_environment() >> #env.set_stream_time_characteristic(TimeCharacteristic.EventTime) >> ##checkpoint设置 >> #env.enable_checkpointing(300000) >> #env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode().EXACTLY_ONCE) >> #env.get_checkpoint_config().set_min_pause_between_checkpoints(30000) >> #env.get_checkpoint_config().set_checkpoint_timeout(60000) >> #env.get_checkpoint_config().set_max_concurrent_checkpoints(1) >> #env.get_checkpoint_config().set_prefer_checkpoint_for_recovery(False) >> ##contain设置 >> env.set_parallelism(12) >> env.set_restart_strategy(RestartStrategies.fixed_delay_restart(50,6)) >> ##使用blink api >> environment_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build() >> table_env = StreamTableEnvironment.create(env,environment_settings=environment_settings) >> >> >> table_env.sql_update("CREATE TABLE flink_sourcetable_ad_test ( \ >> host STRING, \ >> type STRING, \ >> topicid STRING, \ >> message STRING, \ >> proctime as PROCTIME() \ >> ) WITH ( \ >> 'connector.type' = 'kafka', \ >> 'connector.version' = 'universal', \ >> 'connector.topic' = 'advertise_module', \ >> 'connector.properties.zookeeper.connect' = 'localhost:2181', \ >> 'connector.properties.bootstrap.servers' = 'localhost:9092', \ >> 'connector.properties.group.id <http://connector.properties.group.id/>' = 'flink_1.10_test_source', \ >> 'connector.startup-mode' = 'latest-offset', \ >> 'format.type' = 'json', \ >> 'format.derive-schema' = 'true' \ >> )") >> >> >> table_env.sql_update("CREATE TABLE flink_sinktable_ad_test ( \ >> message STRING \ >> ) WITH ( \ >> 'connector.type' = 'kafka', \ >> 'connector.version' = 'universal', \ >> 'connector.topic' = 'recommend_user_concern_test', \ >> 'connector.properties.zookeeper.connect' = 'localhost:2181', \ >> 'connector.properties.bootstrap.servers' = 'localhost:9092', \ >> 'connector.properties.group.id <http://connector.properties.group.id/>' = 'flink_1.10_test_sink', \ >> 'connector.startup-mode' = 'latest-offset', \ >> 'format.type' = 'json', \ >> 'connector.properties.retries' = '3', \ >> 'connector.properties.update_mode' = 'append' \ >> )") >> >> @udf(input_types=DataTypes.STRING(), result_type=DataTypes.ROW()) >> def json_split(message): >> return json.loads(message.replace('\"', '"').replace('"{"', '{"').replace('}"', '}')) >> table_env.register_function("json_split", json_split) >> >> table_env.sql_update("insert into flink_sinktable_ad_test \ >> select \ >> json_split(message) AS message\ >> from \ >> flink_sourcetable_ad_test \ >> ") >> table_env.execute('flink_1.10_test') >> > |
Free forum by Nabble | Edit this page |