Re: [udf questions]

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: [udf questions]

jincheng sun
比较明显的一个问题是UDF定义有些问题是你的sink表定义了一个STRING类型的字段,但是你SQL里面是SELECT了udf的结果,所以我想你的UDF应该也返回一个STRING,也就是result_type=DataTypes.STRING(),同时确保你udf真的返回的是一个字符串,不是json的Object。

Best,
Jincheng


WuPangang <[hidden email]> 于2020年3月26日周四 下午5:24写道:
Data as below:
 {"host":"172.25.69.145","@timestamp":"2020-03-23T09:21:16.315Z","@version":"1","offset":532178261,"logmark":"advertise-api","type":"squirrel","path":"/home/logs/app/action_log/advertise_api/AdStatistic.2020-03-23.log","message":"{\"date_time\":\"2020-03-23 17:21:15\",\"serverTime\":1584955275,\"currentTime\":\"1584955274\",\"distinct_id\":\"734232168\",\"device_id\":\"ad2c15b7cf910cc6\",\"event\":\"view_material\",\"click_pos\":\"ad_view_avatar\",\"material_name\":\"\\u5934\\u50cf\\u66dd\\u5149\",\"phase\":\"prod\",\"source\":\"phone\",\"client_v\":\"2.100\",\"platform\":\"android\",\"ip\":\"39.84.23.81\",\"network\":\"WIFI\",\"idfa\":\"867179032526091\",\"unique_device_id\":\"ad2c15b7cf910cc6\",\"manufacturer\":\"HUAWEI\",\"model\":\"PRA-AL00X\",\"carrier\":\"\",\"local_dns\":\"0.0,0.0\",\"user_id\":\"734232168\",\"is_login\":1,\"url_path\":\"http:\\/\\/down-ddz.734399.com\\/download\\/shuabao\\/shuabao10-awddz-release-hl.apk\",\"country\":\"\",\"province\":\"\",\"city\":\"\",\"setup_source\":\"androiddefault\",\"seller_id\":\"139091\",\"plan_id\":\"380141\",\"plan_name\":\"HL-10-12.27-\\u5927\\u80f8-\\u6e38\\u620f\",\"put_platform_id\":\"25\",\"put_ad_type_id\":\"27\",\"put_position_id\":\"0\",\"put_sell_type_id\":\"0\",\"material_id\":\"378120\",\"show_id\":\"SMALL_VIDEO_6948314\",\"put_start_time\":\"1577808000\",\"plan_params\":\"advertise-380141-378120\",\"download_app_name\":\"\\u7231\\u73a9\\u6597\\u5730\\u4e3b\",\"ad_type\":\"0\",\"put_source\":\"1\",\"is_ad_recommend\":\"\",\"third_video_url\":\"\",\"third_img_url\":\"\",\"ua\":\"JuMei\\/ (PRA-AL00X; Android; Android OS ; 8.0.0; zh) ApacheHttpClient\\/4.0\",\"platform_v\":\"2.100\",\"played_time\":\"\",\"video_time\":\"\",\"status\":1,\"target_link\":\"http:\\/\\/down-ddz.734399.com\\/download\\/shuabao\\/shuabao10-awddz-release-hl.apk\",\"ad_material_title\":\"\\u5de5\\u8d44\\u6ca13W\\uff0c\\u5c31\\u73a9\\u8fd9\\u6597\\u5730\\u4e3b\\uff0c\\u6bcf\\u5929\\u90fd\\u80fd\\u9886\\u7ea2\\u5305~\",\"ad_material_desc\":\"\\u73a9\\u6597\\u5730\\u4e3b\\uff0c\\u7ea2\\u5305\\u79d2\\u4f53\\u73b0\",\"icon_url\":\"http:\\/\\/p12.jmstatic.com\\/adv\\/\\/material\\/20190920\\/jmadv5d849b203750f.png\",\"button_text\":\"\\u7acb\\u5373\\u4e0b\\u8f7d\",\"material_type\":\"video\",\"third_app_id\":\"\",\"third_pos_id\":\"\",\"download_apk\":\"\",\"package_name\":\"\",\"h5_url\":\"\",\"message\":\"\",\"unit_price\":\"4.15\",\"balance_type\":\"3\",\"ecpm\":\"15.189\",\"accumulate_ctr\":\"0.366\",\"pre_ctr\":\"0.366\",\"real_unit_price\":\"0\",\"pre_charge\":\"0\",\"pre_change\":\"0\",\"subsidy_type\":\"\",\"subsidy_ratio\":\"\",\"suppress_type\":\"0\",\"suppress_ratio\":\"0\",\"two_ecpm\":\"15.165\",\"real_ecpm\":\"0\",\"real_unit_price_threshold\":\"0\",\"plan_accumulate_pv\":\"0\",\"plan_accumulate_cpv\":\"0\",\"plan_accumulate_change\":\"0\",\"request_id\":\"ad2c15b7cf910cc6-1584955121.768552967\",\"ad_activity_type\":\"0\",\"one_cost\":\"4.15\",\"bid\":\"0.366\",\"real_spr\":\"0.998419\",\"one_ecpm\":\"15.189\",\"es_one\":\"0,16,\",\"es_two\":\"0\",\"es_three\":\"0\",\"es_four\":\"0\",\"age\":\"1020\",\"sex\":\"0\",\"one_cost_expend\":\"4.15\",\"two_cost_expend\":\"4.153438\",\"reward_source\":\"\",\"provide\":\"\"}","topicid":"advertise_module","project":"advertise-api”}
Problem:
数据是个嵌套json,并且核心字段message的格式不能直接通过table api json 相关的方法来处理。
自己思考的解决思路:通过udf, 使用json.loads来处理。
实际使用中遇到的问题: job提交之后,在dashboard上发现bytes received,records recevied 都是0;下游是同步给Kafka,去消费下游kafka也没有数据。

Code as below:
from pyflink.datastream import StreamExecutionEnvironment,CheckpointingMode,TimeCharacteristic
from pyflink.table import StreamTableEnvironment, EnvironmentSettings,TableSink,TableConfig,DataTypes
from pyflink.table.descriptors import Schema,Kafka,Rowtime,ConnectTableDescriptor,CustomConnectorDescriptor,Json
from pyflink.common import RestartStrategies
from pyflink.table.udf import udf
import json

env = StreamExecutionEnvironment.get_execution_environment()
#env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
##checkpoint设置
#env.enable_checkpointing(300000)
#env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode().EXACTLY_ONCE)
#env.get_checkpoint_config().set_min_pause_between_checkpoints(30000)
#env.get_checkpoint_config().set_checkpoint_timeout(60000)
#env.get_checkpoint_config().set_max_concurrent_checkpoints(1)
#env.get_checkpoint_config().set_prefer_checkpoint_for_recovery(False)
##contain设置
env.set_parallelism(12)
env.set_restart_strategy(RestartStrategies.fixed_delay_restart(50,6))
##使用blink api
environment_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
table_env = StreamTableEnvironment.create(env,environment_settings=environment_settings)


table_env.sql_update("CREATE TABLE flink_sourcetable_ad_test ( \
host STRING, \
type STRING, \
topicid STRING, \
message STRING, \
proctime as PROCTIME() \
) WITH ( \
  'connector.type' = 'kafka',        \
  'connector.version' = 'universal', \
  'connector.topic' = 'advertise_module',  \
  'connector.properties.zookeeper.connect' = 'localhost:2181', \
  'connector.properties.bootstrap.servers' = 'localhost:9092', \
  'connector.properties.group.id' = 'flink_1.10_test_source', \
  'connector.startup-mode' = 'latest-offset', \
  'format.type' = 'json', \
  'format.derive-schema' = 'true' \
)")


table_env.sql_update("CREATE TABLE flink_sinktable_ad_test ( \
message STRING \
) WITH ( \
  'connector.type' = 'kafka',        \
  'connector.version' = 'universal', \
  'connector.topic' = 'recommend_user_concern_test',  \
  'connector.properties.zookeeper.connect' = 'localhost:2181', \
  'connector.properties.bootstrap.servers' = 'localhost:9092', \
  'connector.properties.group.id' = 'flink_1.10_test_sink', \
  'connector.startup-mode' = 'latest-offset', \
  'format.type' = 'json', \
  'connector.properties.retries' = '3', \
  'connector.properties.update_mode' = 'append' \
)")

@udf(input_types=DataTypes.STRING(), result_type=DataTypes.ROW())
def json_split(message):
  return json.loads(message.replace('\"', '"').replace('"{"', '{"').replace('}"', '}'))
table_env.register_function("json_split", json_split)

table_env.sql_update("insert into flink_sinktable_ad_test \
                        select \
                        json_split(message) AS message\
                        from \
                        flink_sourcetable_ad_test \
                        ")
table_env.execute('flink_1.10_test')

Reply | Threaded
Open this post in threaded view
|

Re: [udf questions]

WuPangang
感谢大佬回复。
根据邮件里面的提示下我尝试了如下操作:

@udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING())
def str_add(str_name):
  return '1'
table_env.register_function("str_add", str_add)
table_env.sql_update("insert into flink_sinktable_ad_test_1 \
                        select \
                        str_add(topicid) AS topicid \
                        from \
                        flink_sourcetable_ad_test \
                        ")
目的:我的目的是想通过最简单的方式看看udf是否有生效。
结果:结果依赖没有数据流入近来。
其他手段和测试:我通过不使用udf来验证数据流是否正常的。结果正常。


所以能在分析下么?或者我应该如何深入的跟踪下?

--------
all code below:
from pyflink.datastream import StreamExecutionEnvironment,CheckpointingMode,TimeCharacteristic
from pyflink.table import StreamTableEnvironment, EnvironmentSettings,TableSink,TableConfig,DataTypes
from pyflink.table.descriptors import Schema,Kafka,Rowtime,ConnectTableDescriptor,CustomConnectorDescriptor,Json
from pyflink.common import RestartStrategies
from pyflink.table.udf import udf
import json

env = StreamExecutionEnvironment.get_execution_environment()
##contain设置
env.set_parallelism(12)
env.set_restart_strategy(RestartStrategies.fixed_delay_restart(50,6))
##使用blink api
environment_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
table_env = StreamTableEnvironment.create(env,environment_settings=environment_settings)

table_env.sql_update("CREATE TABLE flink_sourcetable_ad_test ( \
host STRING, \
type STRING, \
topicid STRING, \
message STRING, \
proctime as PROCTIME() \
) WITH ( \
  'connector.type' = 'kafka',        \
  'connector.version' = 'universal', \
  'connector.topic' = 'advertise_module',  \
  'connector.properties.zookeeper.connect' = 'localhost:2181', \
  'connector.properties.bootstrap.servers' = '172.25.80.134:9092', \
  'connector.properties.group.id' = 'flink_1.10_test_source', \
  'connector.startup-mode' = 'latest-offset', \
  'format.type' = 'json' \
)")

table_env.sql_update("CREATE TABLE flink_sinktable_ad_test_1 ( \
topicid STRING \
) WITH ( \
  'connector.type' = 'kafka',        \
  'connector.version' = 'universal', \
  'connector.topic' = 'recommend_user_concern_test',  \
  'connector.properties.zookeeper.connect' = 'localhost:2181', \
  'connector.properties.bootstrap.servers' = '172.25.82.77:9092', \
  'connector.properties.group.id' = 'flink_1.10_test_sink', \
  'connector.startup-mode' = 'latest-offset', \
  'connector.properties.retries' = '3', \
  'format.type' = 'json', \
  'connector.properties.update_mode' = 'append' \
)")
@udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING())
def str_add(str_name):
  return '1'
table_env.register_function("str_add", str_add)
#table_env.register_function("str_add", udf(lambda i: i + '1', DataTypes.STRING(), DataTypes.STRING()))
table_env.sql_update("insert into flink_sinktable_ad_test_1 \
                        select \
                        str_add(topicid) AS topicid \
                        from \
                        flink_sourcetable_ad_test \
                        ")
table_env.execute('flink_1.10_test’)

------

> 在 2020年3月26日,下午5:55,jincheng sun <[hidden email]> 写道:
>
> 比较明显的一个问题是UDF定义有些问题是你的sink表定义了一个STRING类型的字段,但是你SQL里面是SELECT了udf的结果,所以我想你的UDF应该也返回一个STRING,也就是result_type=DataTypes.STRING(),同时确保你udf真的返回的是一个字符串,不是json的Object。
>
> Best,
> Jincheng
>
>
> WuPangang <[hidden email] <mailto:[hidden email]>> 于2020年3月26日周四 下午5:24写道:
> Data as below:
>  {"host":"172.25.69.145","@timestamp":"2020-03-23T09:21:16.315Z","@version":"1","offset":532178261,"logmark":"advertise-api","type":"squirrel","path":"/home/logs/app/action_log/advertise_api/AdStatistic.2020-03-23.log","message":"{\"date_time\":\"2020-03-23 17:21:15\",\"serverTime\":1584955275,\"currentTime\":\"1584955274\",\"distinct_id\":\"734232168\",\"device_id\":\"ad2c15b7cf910cc6\",\"event\":\"view_material\",\"click_pos\":\"ad_view_avatar\",\"material_name\":\"\\u5934\\u50cf\\u66dd\\u5149\",\"phase\":\"prod\",\"source\":\"phone\",\"client_v\":\"2.100\",\"platform\":\"android\",\"ip\":\"39.84.23.81\",\"network\":\"WIFI\",\"idfa\":\"867179032526091\",\"unique_device_id\":\"ad2c15b7cf910cc6\",\"manufacturer\":\"HUAWEI\",\"model\":\"PRA-AL00X\",\"carrier\":\"\",\"local_dns\":\"0.0,0.0\",\"user_id\":\"734232168\",\"is_login\":1,\"url_path\":\"http:\\/\\/down-ddz.734399.com <http://down-ddz.734399.com/>\\/download\\/shuabao\\/shuabao10-awddz-release-hl.apk\",\"country\":\"\",\"province\":\"\",\"city\":\"\",\"setup_source\":\"androiddefault\",\"seller_id\":\"139091\",\"plan_id\":\"380141\",\"plan_name\":\"HL-10-12.27-\\u5927\\u80f8-\\u6e38\\u620f\",\"put_platform_id\":\"25\",\"put_ad_type_id\":\"27\",\"put_position_id\":\"0\",\"put_sell_type_id\":\"0\",\"material_id\":\"378120\",\"show_id\":\"SMALL_VIDEO_6948314\",\"put_start_time\":\"1577808000\",\"plan_params\":\"advertise-380141-378120\",\"download_app_name\":\"\\u7231\\u73a9\\u6597\\u5730\\u4e3b\",\"ad_type\":\"0\",\"put_source\":\"1\",\"is_ad_recommend\":\"\",\"third_video_url\":\"\",\"third_img_url\":\"\",\"ua\":\"JuMei\\/ (PRA-AL00X; Android; Android OS ; 8.0.0; zh) ApacheHttpClient\\/4.0\",\"platform_v\":\"2.100\",\"played_time\":\"\",\"video_time\":\"\",\"status\":1,\"target_link\":\"http:\\/\\/down-ddz.734399.com <http://down-ddz.734399.com/>\\/download\\/shuabao\\/shuabao10-awddz-release-hl.apk\",\"ad_material_title\":\"\\u5de5\\u8d44\\u6ca13W\\uff0c\\u5c31\\u73a9\\u8fd9\\u6597\\u5730\\u4e3b\\uff0c\\u6bcf\\u5929\\u90fd\\u80fd\\u9886\\u7ea2\\u5305~\",\"ad_material_desc\":\"\\u73a9\\u6597\\u5730\\u4e3b\\uff0c\\u7ea2\\u5305\\u79d2\\u4f53\\u73b0\",\"icon_url\":\"http:\\/\\/p12.jmstatic.com <http://p12.jmstatic.com/>\\/adv\\/\\/material\\/20190920\\/jmadv5d849b203750f.png\",\"button_text\":\"\\u7acb\\u5373\\u4e0b\\u8f7d\",\"material_type\":\"video\",\"third_app_id\":\"\",\"third_pos_id\":\"\",\"download_apk\":\"\",\"package_name\":\"\",\"h5_url\":\"\",\"message\":\"\",\"unit_price\":\"4.15\",\"balance_type\":\"3\",\"ecpm\":\"15.189\",\"accumulate_ctr\":\"0.366\",\"pre_ctr\":\"0.366\",\"real_unit_price\":\"0\",\"pre_charge\":\"0\",\"pre_change\":\"0\",\"subsidy_type\":\"\",\"subsidy_ratio\":\"\",\"suppress_type\":\"0\",\"suppress_ratio\":\"0\",\"two_ecpm\":\"15.165\",\"real_ecpm\":\"0\",\"real_unit_price_threshold\":\"0\",\"plan_accumulate_pv\":\"0\",\"plan_accumulate_cpv\":\"0\",\"plan_accumulate_change\":\"0\",\"request_id\":\"ad2c15b7cf910cc6-1584955121.768552967\",\"ad_activity_type\":\"0\",\"one_cost\":\"4.15\",\"bid\":\"0.366\",\"real_spr\":\"0.998419\",\"one_ecpm\":\"15.189\",\"es_one\":\"0,16,\",\"es_two\":\"0\",\"es_three\":\"0\",\"es_four\":\"0\",\"age\":\"1020\",\"sex\":\"0\",\"one_cost_expend\":\"4.15\",\"two_cost_expend\":\"4.153438\",\"reward_source\":\"\",\"provide\":\"\"}","topicid":"advertise_module","project":"advertise-api”}
> Problem:
> 数据是个嵌套json,并且核心字段message的格式不能直接通过table api json 相关的方法来处理。
> 自己思考的解决思路:通过udf, 使用json.loads来处理。
> 实际使用中遇到的问题: job提交之后,在dashboard上发现bytes received,records recevied 都是0;下游是同步给Kafka,去消费下游kafka也没有数据。
>
>
> Code as below:
> from pyflink.datastream import StreamExecutionEnvironment,CheckpointingMode,TimeCharacteristic
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings,TableSink,TableConfig,DataTypes
> from pyflink.table.descriptors import Schema,Kafka,Rowtime,ConnectTableDescriptor,CustomConnectorDescriptor,Json
> from pyflink.common import RestartStrategies
> from pyflink.table.udf import udf
> import json
>
> env = StreamExecutionEnvironment.get_execution_environment()
> #env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> ##checkpoint设置
> #env.enable_checkpointing(300000)
> #env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode().EXACTLY_ONCE)
> #env.get_checkpoint_config().set_min_pause_between_checkpoints(30000)
> #env.get_checkpoint_config().set_checkpoint_timeout(60000)
> #env.get_checkpoint_config().set_max_concurrent_checkpoints(1)
> #env.get_checkpoint_config().set_prefer_checkpoint_for_recovery(False)
> ##contain设置
> env.set_parallelism(12)
> env.set_restart_strategy(RestartStrategies.fixed_delay_restart(50,6))
> ##使用blink api
> environment_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
> table_env = StreamTableEnvironment.create(env,environment_settings=environment_settings)
>
>
> table_env.sql_update("CREATE TABLE flink_sourcetable_ad_test ( \
> host STRING, \
> type STRING, \
> topicid STRING, \
> message STRING, \
> proctime as PROCTIME() \
> ) WITH ( \
>   'connector.type' = 'kafka',        \
>   'connector.version' = 'universal', \
>   'connector.topic' = 'advertise_module',  \
>   'connector.properties.zookeeper.connect' = 'localhost:2181', \
>   'connector.properties.bootstrap.servers' = 'localhost:9092', \
>   'connector.properties.group.id <http://connector.properties.group.id/>' = 'flink_1.10_test_source', \
>   'connector.startup-mode' = 'latest-offset', \
>   'format.type' = 'json', \
>   'format.derive-schema' = 'true' \
> )")
>
>
> table_env.sql_update("CREATE TABLE flink_sinktable_ad_test ( \
> message STRING \
> ) WITH ( \
>   'connector.type' = 'kafka',        \
>   'connector.version' = 'universal', \
>   'connector.topic' = 'recommend_user_concern_test',  \
>   'connector.properties.zookeeper.connect' = 'localhost:2181', \
>   'connector.properties.bootstrap.servers' = 'localhost:9092', \
>   'connector.properties.group.id <http://connector.properties.group.id/>' = 'flink_1.10_test_sink', \
>   'connector.startup-mode' = 'latest-offset', \
>   'format.type' = 'json', \
>   'connector.properties.retries' = '3', \
>   'connector.properties.update_mode' = 'append' \
> )")
>
> @udf(input_types=DataTypes.STRING(), result_type=DataTypes.ROW())
> def json_split(message):
>   return json.loads(message.replace('\"', '"').replace('"{"', '{"').replace('}"', '}'))
> table_env.register_function("json_split", json_split)
>
> table_env.sql_update("insert into flink_sinktable_ad_test \
>                         select \
>                         json_split(message) AS message\
>                         from \
>                         flink_sourcetable_ad_test \
>                         ")
> table_env.execute('flink_1.10_test')
>

Reply | Threaded
Open this post in threaded view
|

Re: [udf questions]

WuPangang
ERROR log:
.
Job has been submitted with JobID 91ac323d4d5338418883240680192f34
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/tmp/dee3edf4-08ff-4c2c-87aa-fef573fae884pyflink.zip/pyflink/table/table_environment.py", line 907, in execute
  File "/tmp/dee3edf4-08ff-4c2c-87aa-fef573fae884py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
  File "/tmp/dee3edf4-08ff-4c2c-87aa-fef573fae884pyflink.zip/pyflink/util/exceptions.py", line 147, in deco
  File "/tmp/dee3edf4-08ff-4c2c-87aa-fef573fae884py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o23.execute.
: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 91ac323d4d5338418883240680192f34)
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
        at org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 91ac323d4d5338418883240680192f34)
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
        ... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=50, backoffTimeMS=6)
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
        at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
        at sun.reflect.GeneratedMethodAccessor64.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 0
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:211)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:202)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:185)
        at org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:179)
        at org.apache.flink.table.runtime.operators.python.AbstractPythonScalarFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractPythonScalarFunctionOperator.java:193)
        at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:139)
        at org.apache.flink.table.runtime.operators.python.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:143)
        at org.apache.flink.table.runtime.operators.python.BaseRowPythonScalarFunctionOperator.open(BaseRowPythonScalarFunctionOperator.java:86)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Process died with exit code 0
        at org.apache.beam.runners.fnexecution.environment.ProcessManager$RunningProcess.isAliveOrThrow(ProcessManager.java:74)
        at org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory.createEnvironment(ProcessEnvironmentFactory.java:125)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:178)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:162)
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
        ... 20 more



> 在 2020年3月27日,上午11:22,WuPangang <[hidden email]> 写道:
>
> 感谢大佬回复。
> 根据邮件里面的提示下我尝试了如下操作:
>
> @udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING())
> def str_add(str_name):
>  return '1'
> table_env.register_function("str_add", str_add)
> table_env.sql_update("insert into flink_sinktable_ad_test_1 \
>                        select \
>                        str_add(topicid) AS topicid \
>                        from \
>                        flink_sourcetable_ad_test \
>                        ")
> 目的:我的目的是想通过最简单的方式看看udf是否有生效。
> 结果:结果依赖没有数据流入近来。
> 其他手段和测试:我通过不使用udf来验证数据流是否正常的。结果正常。
>
>
> 所以能在分析下么?或者我应该如何深入的跟踪下?
>
> --------
> all code below:
> from pyflink.datastream import StreamExecutionEnvironment,CheckpointingMode,TimeCharacteristic
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings,TableSink,TableConfig,DataTypes
> from pyflink.table.descriptors import Schema,Kafka,Rowtime,ConnectTableDescriptor,CustomConnectorDescriptor,Json
> from pyflink.common import RestartStrategies
> from pyflink.table.udf import udf
> import json
>
> env = StreamExecutionEnvironment.get_execution_environment()
> ##contain设置
> env.set_parallelism(12)
> env.set_restart_strategy(RestartStrategies.fixed_delay_restart(50,6))
> ##使用blink api
> environment_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
> table_env = StreamTableEnvironment.create(env,environment_settings=environment_settings)
>
> table_env.sql_update("CREATE TABLE flink_sourcetable_ad_test ( \
> host STRING, \
> type STRING, \
> topicid STRING, \
> message STRING, \
> proctime as PROCTIME() \
> ) WITH ( \
>  'connector.type' = 'kafka',        \
>  'connector.version' = 'universal', \
>  'connector.topic' = 'advertise_module',  \
>  'connector.properties.zookeeper.connect' = 'localhost:2181', \
>  'connector.properties.bootstrap.servers' = '172.25.80.134:9092', \
>  'connector.properties.group.id' = 'flink_1.10_test_source', \
>  'connector.startup-mode' = 'latest-offset', \
>  'format.type' = 'json' \
> )")
>
> table_env.sql_update("CREATE TABLE flink_sinktable_ad_test_1 ( \
> topicid STRING \
> ) WITH ( \
>  'connector.type' = 'kafka',        \
>  'connector.version' = 'universal', \
>  'connector.topic' = 'recommend_user_concern_test',  \
>  'connector.properties.zookeeper.connect' = 'localhost:2181', \
>  'connector.properties.bootstrap.servers' = '172.25.82.77:9092', \
>  'connector.properties.group.id' = 'flink_1.10_test_sink', \
>  'connector.startup-mode' = 'latest-offset', \
>  'connector.properties.retries' = '3', \
>  'format.type' = 'json', \
>  'connector.properties.update_mode' = 'append' \
> )")
> @udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING())
> def str_add(str_name):
>  return '1'
> table_env.register_function("str_add", str_add)
> #table_env.register_function("str_add", udf(lambda i: i + '1', DataTypes.STRING(), DataTypes.STRING()))
> table_env.sql_update("insert into flink_sinktable_ad_test_1 \
>                        select \
>                        str_add(topicid) AS topicid \
>                        from \
>                        flink_sourcetable_ad_test \
>                        ")
> table_env.execute('flink_1.10_test’)
>
> ------
>> 在 2020年3月26日,下午5:55,jincheng sun <[hidden email]> 写道:
>>
>> 比较明显的一个问题是UDF定义有些问题是你的sink表定义了一个STRING类型的字段,但是你SQL里面是SELECT了udf的结果,所以我想你的UDF应该也返回一个STRING,也就是result_type=DataTypes.STRING(),同时确保你udf真的返回的是一个字符串,不是json的Object。
>>
>> Best,
>> Jincheng
>>
>>
>> WuPangang <[hidden email] <mailto:[hidden email]>> 于2020年3月26日周四 下午5:24写道:
>> Data as below:
>> {"host":"172.25.69.145","@timestamp":"2020-03-23T09:21:16.315Z","@version":"1","offset":532178261,"logmark":"advertise-api","type":"squirrel","path":"/home/logs/app/action_log/advertise_api/AdStatistic.2020-03-23.log","message":"{\"date_time\":\"2020-03-23 17:21:15\",\"serverTime\":1584955275,\"currentTime\":\"1584955274\",\"distinct_id\":\"734232168\",\"device_id\":\"ad2c15b7cf910cc6\",\"event\":\"view_material\",\"click_pos\":\"ad_view_avatar\",\"material_name\":\"\\u5934\\u50cf\\u66dd\\u5149\",\"phase\":\"prod\",\"source\":\"phone\",\"client_v\":\"2.100\",\"platform\":\"android\",\"ip\":\"39.84.23.81\",\"network\":\"WIFI\",\"idfa\":\"867179032526091\",\"unique_device_id\":\"ad2c15b7cf910cc6\",\"manufacturer\":\"HUAWEI\",\"model\":\"PRA-AL00X\",\"carrier\":\"\",\"local_dns\":\"0.0,0.0\",\"user_id\":\"734232168\",\"is_login\":1,\"url_path\":\"http:\\/\\/down-ddz.734399.com <http://down-ddz.734399.com/>\\/download\\/shuabao\\/shuabao10-awddz-release-hl.apk\",\"country\":\"\",\"province\":\"\",\"city\":\"\",\"setup_source\":\"androiddefault\",\"seller_id\":\"139091\",\"plan_id\":\"380141\",\"plan_name\":\"HL-10-12.27-\\u5927\\u80f8-\\u6e38\\u620f\",\"put_platform_id\":\"25\",\"put_ad_type_id\":\"27\",\"put_position_id\":\"0\",\"put_sell_type_id\":\"0\",\"material_id\":\"378120\",\"show_id\":\"SMALL_VIDEO_6948314\",\"put_start_time\":\"1577808000\",\"plan_params\":\"advertise-380141-378120\",\"download_app_name\":\"\\u7231\\u73a9\\u6597\\u5730\\u4e3b\",\"ad_type\":\"0\",\"put_source\":\"1\",\"is_ad_recommend\":\"\",\"third_video_url\":\"\",\"third_img_url\":\"\",\"ua\":\"JuMei\\/ (PRA-AL00X; Android; Android OS ; 8.0.0; zh) ApacheHttpClient\\/4.0\",\"platform_v\":\"2.100\",\"played_time\":\"\",\"video_time\":\"\",\"status\":1,\"target_link\":\"http:\\/\\/down-ddz.734399.com <http://down-ddz.734399.com/>\\/download\\/shuabao\\/shuabao10-awddz-release-hl.apk\",\"ad_material_title\":\"\\u5de5\\u8d44\\u6ca13W\\uff0c\\u5c31\\u73a9\\u8fd9\\u6597\\u5730\\u4e3b\\uff0c\\u6bcf\\u5929\\u90fd\\u80fd\\u9886\\u7ea2\\u5305~\",\"ad_material_desc\":\"\\u73a9\\u6597\\u5730\\u4e3b\\uff0c\\u7ea2\\u5305\\u79d2\\u4f53\\u73b0\",\"icon_url\":\"http:\\/\\/p12.jmstatic.com <http://p12.jmstatic.com/>\\/adv\\/\\/material\\/20190920\\/jmadv5d849b203750f.png\",\"button_text\":\"\\u7acb\\u5373\\u4e0b\\u8f7d\",\"material_type\":\"video\",\"third_app_id\":\"\",\"third_pos_id\":\"\",\"download_apk\":\"\",\"package_name\":\"\",\"h5_url\":\"\",\"message\":\"\",\"unit_price\":\"4.15\",\"balance_type\":\"3\",\"ecpm\":\"15.189\",\"accumulate_ctr\":\"0.366\",\"pre_ctr\":\"0.366\",\"real_unit_price\":\"0\",\"pre_charge\":\"0\",\"pre_change\":\"0\",\"subsidy_type\":\"\",\"subsidy_ratio\":\"\",\"suppress_type\":\"0\",\"suppress_ratio\":\"0\",\"two_ecpm\":\"15.165\",\"real_ecpm\":\"0\",\"real_unit_price_threshold\":\"0\",\"plan_accumulate_pv\":\"0\",\"plan_accumulate_cpv\":\"0\",\"plan_accumulate_change\":\"0\",\"request_id\":\"ad2c15b7cf910cc6-1584955121.768552967\",\"ad_activity_type\":\"0\",\"one_cost\":\"4.15\",\"bid\":\"0.366\",\"real_spr\":\"0.998419\",\"one_ecpm\":\"15.189\",\"es_one\":\"0,16,\",\"es_two\":\"0\",\"es_three\":\"0\",\"es_four\":\"0\",\"age\":\"1020\",\"sex\":\"0\",\"one_cost_expend\":\"4.15\",\"two_cost_expend\":\"4.153438\",\"reward_source\":\"\",\"provide\":\"\"}","topicid":"advertise_module","project":"advertise-api”}
>> Problem:
>> 数据是个嵌套json,并且核心字段message的格式不能直接通过table api json 相关的方法来处理。
>> 自己思考的解决思路:通过udf, 使用json.loads来处理。
>> 实际使用中遇到的问题: job提交之后,在dashboard上发现bytes received,records recevied 都是0;下游是同步给Kafka,去消费下游kafka也没有数据。
>>
>>
>> Code as below:
>> from pyflink.datastream import StreamExecutionEnvironment,CheckpointingMode,TimeCharacteristic
>> from pyflink.table import StreamTableEnvironment, EnvironmentSettings,TableSink,TableConfig,DataTypes
>> from pyflink.table.descriptors import Schema,Kafka,Rowtime,ConnectTableDescriptor,CustomConnectorDescriptor,Json
>> from pyflink.common import RestartStrategies
>> from pyflink.table.udf import udf
>> import json
>>
>> env = StreamExecutionEnvironment.get_execution_environment()
>> #env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>> ##checkpoint设置
>> #env.enable_checkpointing(300000)
>> #env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode().EXACTLY_ONCE)
>> #env.get_checkpoint_config().set_min_pause_between_checkpoints(30000)
>> #env.get_checkpoint_config().set_checkpoint_timeout(60000)
>> #env.get_checkpoint_config().set_max_concurrent_checkpoints(1)
>> #env.get_checkpoint_config().set_prefer_checkpoint_for_recovery(False)
>> ##contain设置
>> env.set_parallelism(12)
>> env.set_restart_strategy(RestartStrategies.fixed_delay_restart(50,6))
>> ##使用blink api
>> environment_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
>> table_env = StreamTableEnvironment.create(env,environment_settings=environment_settings)
>>
>>
>> table_env.sql_update("CREATE TABLE flink_sourcetable_ad_test ( \
>> host STRING, \
>> type STRING, \
>> topicid STRING, \
>> message STRING, \
>> proctime as PROCTIME() \
>> ) WITH ( \
>>  'connector.type' = 'kafka',        \
>>  'connector.version' = 'universal', \
>>  'connector.topic' = 'advertise_module',  \
>>  'connector.properties.zookeeper.connect' = 'localhost:2181', \
>>  'connector.properties.bootstrap.servers' = 'localhost:9092', \
>>  'connector.properties.group.id <http://connector.properties.group.id/>' = 'flink_1.10_test_source', \
>>  'connector.startup-mode' = 'latest-offset', \
>>  'format.type' = 'json', \
>>  'format.derive-schema' = 'true' \
>> )")
>>
>>
>> table_env.sql_update("CREATE TABLE flink_sinktable_ad_test ( \
>> message STRING \
>> ) WITH ( \
>>  'connector.type' = 'kafka',        \
>>  'connector.version' = 'universal', \
>>  'connector.topic' = 'recommend_user_concern_test',  \
>>  'connector.properties.zookeeper.connect' = 'localhost:2181', \
>>  'connector.properties.bootstrap.servers' = 'localhost:9092', \
>>  'connector.properties.group.id <http://connector.properties.group.id/>' = 'flink_1.10_test_sink', \
>>  'connector.startup-mode' = 'latest-offset', \
>>  'format.type' = 'json', \
>>  'connector.properties.retries' = '3', \
>>  'connector.properties.update_mode' = 'append' \
>> )")
>>
>> @udf(input_types=DataTypes.STRING(), result_type=DataTypes.ROW())
>> def json_split(message):
>>  return json.loads(message.replace('\"', '"').replace('"{"', '{"').replace('}"', '}'))
>> table_env.register_function("json_split", json_split)
>>
>> table_env.sql_update("insert into flink_sinktable_ad_test \
>>                        select \
>>                        json_split(message) AS message\
>>                        from \
>>                        flink_sourcetable_ad_test \
>>                        ")
>> table_env.execute('flink_1.10_test')
>>
>