您好:
我使用flink1.10.1版本streamapi编写程序时,在不同的richfunction中 分别使用Class.forName("*****"); 来加载数据库驱动。是不同的两个数据库驱动;这样会导致程序卡住不往下执行;有人遇到吗? [hidden email] |
Hi,
是执行到哪步出现了问题?可以提供下面一些内容来帮忙定位问题吗? 1.截图或者日志 2.不同的数据库都是哪些数据库,以及版本是哪些 3.单写一个测试用例加载两个数据库是否能够加载成功 4.代码伪编码 Best, Yichao Yang ------------------ 原始邮件 ------------------ 发件人: [hidden email] <[hidden email]> 发送时间: 2020年7月8日 18:32 收件人: user-zh <[hidden email]> 主题: 回复:关于 richfunction中初始化数据库连接的问题 您好: 我使用flink1.10.1版本streamapi编写程序时,在不同的richfunction中 分别使用Class.forName("*****"); 来加载数据库驱动。是不同的两个数据库驱动;这样会导致程序卡住不往下执行;有人遇到吗? [hidden email] |
In reply to this post by 1101300123
FlinkKafkaConsumer<Bill> kafkaConsumer = new FlinkKafkaConsumer<>(TrafficConstants.BILLTOPIC,new SchargeConsumerSchema(), props); kafkaConsumer.setStartFromLatest(); SingleOutputStreamOperator<Bill> process = env.addSource(kafkaConsumer).setParallelism(4) .filter(new HiveFilterFunction(TrafficConstants.HIVEURL, TrafficConstants.HIVEUSERNAME, TrafficConstants.HIVEPASSWORD)).name("流量费过滤") .keyBy((KeySelector<Bill, String>) value -> value.getUser_id() + value.getSerial_number() + value.getProvince_code()) .process(***); SingleOutputStreamOperator<BillInfo> map = process.map(); map.addSink(new RdsFlowSink(TrafficConstants.URL, TrafficConstants.USERNAME, TrafficConstants.PASSWORD)) .setParallelism(1).name("sinkRds"); 这是主要逻辑:Kafka取数-->自定义richfilter函数加载hive维表数据来过滤数据-->keyby-->process-->自定义sink函数 public class HiveFilterFunction extends RichFilterFunction<Bill> { Logger LOG = LoggerFactory.getLogger(HiveFilterFunction.class); private final String jdbcUrl; private final String username; private final String password; private transient volatile Statement sts; private transient volatile Connection connection; Map<String, String> map = new ConcurrentHashMap(); public HiveFilterFunction(String jdbcUrl, String username, String password) { this.jdbcUrl = jdbcUrl; this.username = username; this.password = password; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); Class.forName("org.apache.hive.jdbc.HiveDriver"); connection = DriverManager.getConnection(jdbcUrl, username, password); LOG.info("hive connection --- " + connection); sts = connection.createStatement(); query(); } @Override public boolean filter(Bill value) { return map.containsKey(value.getIntegrate_item_code()) && TrafficConstants.getProCode().contains(value.getProvince_code()); } @Override public void close() throws Exception { super.close(); assert null != sts ; assert null != connection ; sts.close(); connection.close(); } private void query() throws Exception { ResultSet resultSet = null; try { sts.execute(TrafficConstants.SETSQL); resultSet = sts.executeQuery(TrafficConstants.CODESQL); while (resultSet.next()) { map.put(resultSet.getString("charge_code_cbss"), ""); } } catch (Exception e) { LOG.error("hive error", e); throw new Exception(e); } finally { assert resultSet != null; resultSet.close(); } LOG.info("hive 维表数据加载完成"); } } public class RdsFlowSink extends RichSinkFunction<BillInfo>{ Logger LOG = LoggerFactory.getLogger(RdsFlowSink.class); private final String url; private final String name; private final String password; private transient volatile PreparedStatement insertStatement; private transient volatile Connection connection; private transient volatile Counter counter = null; public RdsFlowSink(String url, String name, String password) { this.url = url; this.name = name; this.password = password; } @Override public void open(Configuration parameters) throws Exception { Class.forName("com.mysql.jdbc.Driver"); connection = DriverManager.getConnection(url,name,password); LOG.info("connection --- " + connection); counter = getRuntimeContext().getMetricGroup().counter("counter"); insertStatement = connection.prepareStatement(TrafficConstants.FLOWSQL); } @Override public void invoke(BillInfo value, Context context) throws Exception { try { insertStatement.setString(1,value.getSerial_number()); insertStatement.setString(2,value.getUser_id()); insertStatement.setString(3,value.getIntegrate_item_code()); insertStatement.setString(4,value.getFee()); insertStatement.setString(5,value.getCity_code()); counter.inc(1); insertStatement.execute(); }catch (Exception e){ LOG.info("invoke --- " + connection); LOG.error(e.getMessage()); throw new Exception(e); } } @Override public void close() throws Exception { super.close(); assert insertStatement != null; assert connection != null; insertStatement.close(); connection.close(); } } 执行的时候程序会卡在 Class.forName("org.apache.hive.jdbc.HiveDriver"); 或者 Class.forName("com.mysql.jdbc.Driver"); 这里 [hidden email] 发件人: JasonLee 发送时间: 2020-07-08 18:46 收件人: user-zh 主题: 回复:关于 richfunction中初始化数据库连接的问题 hi 具体是卡在什么地方了呢?可以打印日志定位一下 理论上是不会有这样的问题 还有单个执行的话可以吗? | | JasonLee | | 邮箱:[hidden email] | Signature is customized by Netease Mail Master 在2020年07月08日 18:32,[hidden email] 写道: 您好: 我使用flink1.10.1版本streamapi编写程序时,在不同的richfunction中 分别使用Class.forName("*****"); 来加载数据库驱动。是不同的两个数据库驱动;这样会导致程序卡住不往下执行;有人遇到吗? [hidden email] |
Hi,
可以先写单元测试看下是否能同时加载两个数据库,先排除数据库连接本身的问题。 Best, Yichao Yang ------------------ 原始邮件 ------------------ 发件人: "[hidden email]"<[hidden email]>; 发送时间: 2020年7月9日(星期四) 中午11:31 收件人: "user-zh"<[hidden email]>; 主题: 回复: 回复:关于 richfunction中初始化数据库连接的问题 FlinkKafkaConsumer<Bill> kafkaConsumer = new FlinkKafkaConsumer<>(TrafficConstants.BILLTOPIC,new SchargeConsumerSchema(), props); kafkaConsumer.setStartFromLatest(); SingleOutputStreamOperator<Bill> process = env.addSource(kafkaConsumer).setParallelism(4) .filter(new HiveFilterFunction(TrafficConstants.HIVEURL, TrafficConstants.HIVEUSERNAME, TrafficConstants.HIVEPASSWORD)).name("流量费过滤") .keyBy((KeySelector<Bill, String>) value -> value.getUser_id() + value.getSerial_number() + value.getProvince_code()) .process(***); SingleOutputStreamOperator<BillInfo> map = process.map(); map.addSink(new RdsFlowSink(TrafficConstants.URL, TrafficConstants.USERNAME, TrafficConstants.PASSWORD)) .setParallelism(1).name("sinkRds"); 这是主要逻辑:Kafka取数-->自定义richfilter函数加载hive维表数据来过滤数据-->keyby-->process-->自定义sink函数 public class HiveFilterFunction extends RichFilterFunction<Bill> { Logger LOG = LoggerFactory.getLogger(HiveFilterFunction.class); private final String jdbcUrl; private final String username; private final String password; private transient volatile Statement sts; private transient volatile Connection connection; Map<String, String> map = new ConcurrentHashMap(); public HiveFilterFunction(String jdbcUrl, String username, String password) { this.jdbcUrl = jdbcUrl; this.username = username; this.password = password; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); Class.forName("org.apache.hive.jdbc.HiveDriver"); connection = DriverManager.getConnection(jdbcUrl, username, password); LOG.info("hive connection --- " + connection); sts = connection.createStatement(); query(); } @Override public boolean filter(Bill value) { return map.containsKey(value.getIntegrate_item_code()) && TrafficConstants.getProCode().contains(value.getProvince_code()); } @Override public void close() throws Exception { super.close(); assert null != sts ; assert null != connection ; sts.close(); connection.close(); } private void query() throws Exception { ResultSet resultSet = null; try { sts.execute(TrafficConstants.SETSQL); resultSet = sts.executeQuery(TrafficConstants.CODESQL); while (resultSet.next()) { map.put(resultSet.getString("charge_code_cbss"), ""); } } catch (Exception e) { LOG.error("hive error", e); throw new Exception(e); } finally { assert resultSet != null; resultSet.close(); } LOG.info("hive 维表数据加载完成"); } } public class RdsFlowSink extends RichSinkFunction<BillInfo>{ Logger LOG = LoggerFactory.getLogger(RdsFlowSink.class); private final String url; private final String name; private final String password; private transient volatile PreparedStatement insertStatement; private transient volatile Connection connection; private transient volatile Counter counter = null; public RdsFlowSink(String url, String name, String password) { this.url = url; this.name = name; this.password = password; } @Override public void open(Configuration parameters) throws Exception { Class.forName("com.mysql.jdbc.Driver"); connection = DriverManager.getConnection(url,name,password); LOG.info("connection --- " + connection); counter = getRuntimeContext().getMetricGroup().counter("counter"); insertStatement = connection.prepareStatement(TrafficConstants.FLOWSQL); } @Override public void invoke(BillInfo value, Context context) throws Exception { try { insertStatement.setString(1,value.getSerial_number()); insertStatement.setString(2,value.getUser_id()); insertStatement.setString(3,value.getIntegrate_item_code()); insertStatement.setString(4,value.getFee()); insertStatement.setString(5,value.getCity_code()); counter.inc(1); insertStatement.execute(); }catch (Exception e){ LOG.info("invoke --- " + connection); LOG.error(e.getMessage()); throw new Exception(e); } } @Override public void close() throws Exception { super.close(); assert insertStatement != null; assert connection != null; insertStatement.close(); connection.close(); } } 执行的时候程序会卡在 Class.forName("org.apache.hive.jdbc.HiveDriver"); 或者 Class.forName("com.mysql.jdbc.Driver"); 这里 [hidden email] 发件人: JasonLee 发送时间: 2020-07-08 18:46 收件人: user-zh 主题: 回复:关于 richfunction中初始化数据库连接的问题 hi 具体是卡在什么地方了呢?可以打印日志定位一下 理论上是不会有这样的问题 还有单个执行的话可以吗? | | JasonLee | | 邮箱:[hidden email] | Signature is customized by Netease Mail Master 在2020年07月08日 18:32,[hidden email] 写道: 您好: 我使用flink1.10.1版本streamapi编写程序时,在不同的richfunction中 分别使用Class.forName("*****"); 来加载数据库驱动。是不同的两个数据库驱动;这样会导致程序卡住不往下执行;有人遇到吗? [hidden email] |
我測試發現,但flink的算子发送并行度变化时 会出现问题,如果我的并行度一致则没有问题!测试代码如下;(最近比较慢,没回复)
整体的并行度为1,其中map后改变并行度为2, 程序就会卡主不执行; (我的实际项目过滤后再计算出的数据很少所以sink的并行度是1) import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.Statement; import java.util.ArrayList; import java.util.List; import java.util.Random; public class TimerMain4 { public static void main(String[] args) throws Exception { Logger LOG = LoggerFactory.getLogger(TimerMain4.class); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); env.addSource(new MySourceTuple()) .filter(new RichFilterFunction<Tuple2<String, Long>>() { private transient volatile Statement sts1; private transient volatile Connection conn1; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); Class.forName("org.apache.hive.jdbc.HiveDriver"); conn1 = DriverManager.getConnection("", "", ""); LOG.info("connection --- " + conn1); sts1 = conn1.createStatement(); } @Override public boolean filter(Tuple2<String, Long> value) { return true; } @Override public void close() throws Exception { super.close(); sts1.close(); conn1.close(); } }) .map(Tuple2::toString) .setParallelism(1) .addSink(new RichSinkFunction<String>() { private transient volatile PreparedStatement sts2; private transient volatile Connection conn2; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); Class.forName("com.mysql.jdbc.Driver"); conn2 = DriverManager.getConnection("", "", ""); LOG.info("connection --- " + conn2); sts2 = conn2.prepareStatement(""); } @Override public void close() throws Exception { super.close(); sts2.close(); conn2.close(); } @Override public void invoke(String value, Context context) { LOG.info(value); } }).setParallelism(1); env.execute(); } } class MySourceTuple implements SourceFunction<Tuple2<String, Long>> { private Boolean isRunning = true; List<String> names = new ArrayList(); private final Random random = new Random(); Long number = 1L; @Override public void run(SourceContext<Tuple2<String, Long>> ctx) throws Exception { names.add("张"); names.add("王"); names.add("李"); names.add("赵"); while (isRunning) { int index = random.nextInt(4); ctx.collect(new Tuple2<>(names.get(index), number)); number += 1; Thread.sleep(1000); } } @Override public void cancel() { isRunning = false; } } [hidden email] 发件人: Yichao Yang 发送时间: 2020-07-09 13:11 收件人: user-zh 主题: 回复: 回复:关于 richfunction中初始化数据库连接的问题 Hi, 可以先写单元测试看下是否能同时加载两个数据库,先排除数据库连接本身的问题。 Best, Yichao Yang ------------------ 原始邮件 ------------------ 发件人: "[hidden email]"<[hidden email]>; 发送时间: 2020年7月9日(星期四) 中午11:31 收件人: "user-zh"<[hidden email]>; 主题: 回复: 回复:关于 richfunction中初始化数据库连接的问题 FlinkKafkaConsumer<Bill> kafkaConsumer = new FlinkKafkaConsumer<>(TrafficConstants.BILLTOPIC,new SchargeConsumerSchema(), props); kafkaConsumer.setStartFromLatest(); SingleOutputStreamOperator<Bill> process = env.addSource(kafkaConsumer).setParallelism(4) .filter(new HiveFilterFunction(TrafficConstants.HIVEURL, TrafficConstants.HIVEUSERNAME, TrafficConstants.HIVEPASSWORD)).name("流量费过滤") .keyBy((KeySelector<Bill, String>) value -> value.getUser_id() + value.getSerial_number() + value.getProvince_code()) .process(***); SingleOutputStreamOperator<BillInfo> map = process.map(); map.addSink(new RdsFlowSink(TrafficConstants.URL, TrafficConstants.USERNAME, TrafficConstants.PASSWORD)) .setParallelism(1).name("sinkRds"); 这是主要逻辑:Kafka取数-->自定义richfilter函数加载hive维表数据来过滤数据-->keyby-->process-->自定义sink函数 public class HiveFilterFunction extends RichFilterFunction<Bill> { Logger LOG = LoggerFactory.getLogger(HiveFilterFunction.class); private final String jdbcUrl; private final String username; private final String password; private transient volatile Statement sts; private transient volatile Connection connection; Map<String, String> map = new ConcurrentHashMap(); public HiveFilterFunction(String jdbcUrl, String username, String password) { this.jdbcUrl = jdbcUrl; this.username = username; this.password = password; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); Class.forName("org.apache.hive.jdbc.HiveDriver"); connection = DriverManager.getConnection(jdbcUrl, username, password); LOG.info("hive connection --- " + connection); sts = connection.createStatement(); query(); } @Override public boolean filter(Bill value) { return map.containsKey(value.getIntegrate_item_code()) && TrafficConstants.getProCode().contains(value.getProvince_code()); } @Override public void close() throws Exception { super.close(); assert null != sts ; assert null != connection ; sts.close(); connection.close(); } private void query() throws Exception { ResultSet resultSet = null; try { sts.execute(TrafficConstants.SETSQL); resultSet = sts.executeQuery(TrafficConstants.CODESQL); while (resultSet.next()) { map.put(resultSet.getString("charge_code_cbss"), ""); } } catch (Exception e) { LOG.error("hive error", e); throw new Exception(e); } finally { assert resultSet != null; resultSet.close(); } LOG.info("hive 维表数据加载完成"); } } public class RdsFlowSink extends RichSinkFunction<BillInfo>{ Logger LOG = LoggerFactory.getLogger(RdsFlowSink.class); private final String url; private final String name; private final String password; private transient volatile PreparedStatement insertStatement; private transient volatile Connection connection; private transient volatile Counter counter = null; public RdsFlowSink(String url, String name, String password) { this.url = url; this.name = name; this.password = password; } @Override public void open(Configuration parameters) throws Exception { Class.forName("com.mysql.jdbc.Driver"); connection = DriverManager.getConnection(url,name,password); LOG.info("connection --- " + connection); counter = getRuntimeContext().getMetricGroup().counter("counter"); insertStatement = connection.prepareStatement(TrafficConstants.FLOWSQL); } @Override public void invoke(BillInfo value, Context context) throws Exception { try { insertStatement.setString(1,value.getSerial_number()); insertStatement.setString(2,value.getUser_id()); insertStatement.setString(3,value.getIntegrate_item_code()); insertStatement.setString(4,value.getFee()); insertStatement.setString(5,value.getCity_code()); counter.inc(1); insertStatement.execute(); }catch (Exception e){ LOG.info("invoke --- " + connection); LOG.error(e.getMessage()); throw new Exception(e); } } @Override public void close() throws Exception { super.close(); assert insertStatement != null; assert connection != null; insertStatement.close(); connection.close(); } } 执行的时候程序会卡在 Class.forName("org.apache.hive.jdbc.HiveDriver"); 或者 Class.forName("com.mysql.jdbc.Driver"); 这里 [hidden email] 发件人: JasonLee 发送时间: 2020-07-08 18:46 收件人: user-zh 主题: 回复:关于 richfunction中初始化数据库连接的问题 hi 具体是卡在什么地方了呢?可以打印日志定位一下 理论上是不会有这样的问题 还有单个执行的话可以吗? | | JasonLee | | 邮箱:[hidden email] | Signature is customized by Netease Mail Master 在2020年07月08日 18:32,[hidden email] 写道: 您好: 我使用flink1.10.1版本streamapi编写程序时,在不同的richfunction中 分别使用Class.forName("*****"); 来加载数据库驱动。是不同的两个数据库驱动;这样会导致程序卡住不往下执行;有人遇到吗? [hidden email] |
Free forum by Nabble | Edit this page |