请教一个问题:从一个mysql表到另一个MySQL的操作,单机linux下运行,出现如下错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o6.execute. : java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [org.apache.flink.runtime.rest.handler.RestHandlerException: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (4d6b1273229e0e16fa433c652b5cb74d) 代码如下: from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes batch_source_ddl = """ CREATE TABLE mh_source_tab ( lid INT, dir INT, posid BIGINT, km BIGINT, poleId BIGINT, ts BIGINT, rt Decimal(6,2), time1 VARCHAR ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://**', 'table-name' = 'nj_mh_test', --基础数据表 'driver' = 'com.mysql.cj.jdbc.Driver', 'username' = 'root', 'password' = 'root', --'sink.buffer-flush.max-rows' = '100', 'sink.buffer-flush.interval' = '1s' ) warn_alarm_mh_ddl = """ CREATE TABLE warn_alarm_mh_sink ( lid INT, dir INT, posid BIGINT, km BIGINT, poleId BIGINT, extremum Decimal(6,2), PRIMARY KEY (lid,dir,posid,poleId) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://**', 'table-name' = 'warn_mh_alarm_result', --结果数据表 'driver' = 'com.mysql.cj.jdbc.Driver', 'username' = 'root', 'password' = 'root', --'sink.buffer-flush.max-rows' = '100', 'sink.buffer-flush.interval' = '1s' ) """ env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build() t_env = StreamTableEnvironment.create(env, environment_settings=env_settings) t_env.execute_sql(batch_source_ddl ) t_env.execute_sql(warn_alarm_mh_ddl) def threshold_alarm(delta_thres): source = t_env.from_path("mh_source_tab") \ .where("rt <> -10000")\ .group_by("lid, dir, posid, km, poleId")\ .select("lid, dir, posid, km, poleId, max(rt) as max_rt, min(rt) as min_rt, max(rt)-min(rt) as extremum")\ .where("extremum >"+str(delta_thres))\ .select("lid, dir, posid, km, poleId, extremum") source.execute_insert("warn_alarm_mh_sink") \ .get_job_client() \ .get_job_execution_result() \ .result() if __name__ == '__main__': threshold_alarm(delta_thres=0.5) |
Free forum by Nabble | Edit this page |