关于 richfunction中初始化数据库连接的问题

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

关于 richfunction中初始化数据库连接的问题

1101300123
您好:
        我使用flink1.10.1版本streamapi编写程序时,在不同的richfunction中 分别使用Class.forName("*****"); 来加载数据库驱动。是不同的两个数据库驱动;这样会导致程序卡住不往下执行;有人遇到吗?


[hidden email]
Reply | Threaded
Open this post in threaded view
|

回复:关于 richfunction中初始化数据库连接的问题

Yichao Yang
Hi,


是执行到哪步出现了问题?可以提供下面一些内容来帮忙定位问题吗?
1.截图或者日志
2.不同的数据库都是哪些数据库,以及版本是哪些
3.单写一个测试用例加载两个数据库是否能够加载成功
4.代码伪编码


Best,
Yichao Yang


------------------ 原始邮件 ------------------
发件人: [hidden email] <[hidden email]&gt;
发送时间: 2020年7月8日 18:32
收件人: user-zh <[hidden email]&gt;
主题: 回复:关于 richfunction中初始化数据库连接的问题



您好:
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 我使用flink1.10.1版本streamapi编写程序时,在不同的richfunction中 分别使用Class.forName("*****"); 来加载数据库驱动。是不同的两个数据库驱动;这样会导致程序卡住不往下执行;有人遇到吗?


[hidden email]
Reply | Threaded
Open this post in threaded view
|

回复: 回复:关于 richfunction中初始化数据库连接的问题

1101300123
In reply to this post by 1101300123

FlinkKafkaConsumer<Bill> kafkaConsumer = new FlinkKafkaConsumer<>(TrafficConstants.BILLTOPIC,new SchargeConsumerSchema(), props);
kafkaConsumer.setStartFromLatest();
SingleOutputStreamOperator<Bill> process = env.addSource(kafkaConsumer).setParallelism(4)
.filter(new HiveFilterFunction(TrafficConstants.HIVEURL, TrafficConstants.HIVEUSERNAME, TrafficConstants.HIVEPASSWORD)).name("流量费过滤")
.keyBy((KeySelector<Bill, String>) value -> value.getUser_id() + value.getSerial_number() + value.getProvince_code())
.process(***);
SingleOutputStreamOperator<BillInfo> map = process.map();
map.addSink(new RdsFlowSink(TrafficConstants.URL, TrafficConstants.USERNAME, TrafficConstants.PASSWORD))
.setParallelism(1).name("sinkRds");

这是主要逻辑:Kafka取数-->自定义richfilter函数加载hive维表数据来过滤数据-->keyby-->process-->自定义sink函数

public class HiveFilterFunction extends RichFilterFunction<Bill> {
    Logger LOG = LoggerFactory.getLogger(HiveFilterFunction.class);
    private final String jdbcUrl;
    private final String username;
    private final String password;
    private transient volatile Statement sts;
    private transient volatile Connection connection;
    Map<String, String> map = new ConcurrentHashMap();

    public HiveFilterFunction(String jdbcUrl, String username, String password) {
        this.jdbcUrl = jdbcUrl;
        this.username = username;
        this.password = password;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        Class.forName("org.apache.hive.jdbc.HiveDriver");
        connection = DriverManager.getConnection(jdbcUrl, username, password);
        LOG.info("hive connection --- " + connection);
        sts = connection.createStatement();
        query();
    }

    @Override
    public boolean filter(Bill value) {
        return map.containsKey(value.getIntegrate_item_code())
                && TrafficConstants.getProCode().contains(value.getProvince_code());
    }

    @Override
    public void close() throws Exception {
        super.close();
        assert null != sts ;
        assert null != connection ;
        sts.close();
        connection.close();
    }

    private void query() throws Exception {
        ResultSet resultSet = null;
        try {
            sts.execute(TrafficConstants.SETSQL);
            resultSet = sts.executeQuery(TrafficConstants.CODESQL);
            while (resultSet.next()) {
                map.put(resultSet.getString("charge_code_cbss"), "");
            }
        } catch (Exception e) {
            LOG.error("hive error", e);
            throw new Exception(e);
        } finally {
            assert resultSet != null;
            resultSet.close();
        }
        LOG.info("hive 维表数据加载完成");
    }
}

public class RdsFlowSink extends RichSinkFunction<BillInfo>{
    Logger LOG = LoggerFactory.getLogger(RdsFlowSink.class);
    private final String url;
    private final String name;
    private final String password;

    private transient volatile PreparedStatement insertStatement;
    private transient volatile Connection connection;
    private transient volatile Counter counter = null;
 
    public RdsFlowSink(String url, String name, String password) {
        this.url = url;
        this.name = name;
        this.password = password;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName("com.mysql.jdbc.Driver");
        connection = DriverManager.getConnection(url,name,password);
        LOG.info("connection --- " + connection);
        counter = getRuntimeContext().getMetricGroup().counter("counter");
        insertStatement = connection.prepareStatement(TrafficConstants.FLOWSQL);
     
    }

    @Override
    public void invoke(BillInfo value, Context context) throws Exception {
        try {
            insertStatement.setString(1,value.getSerial_number());
            insertStatement.setString(2,value.getUser_id());
            insertStatement.setString(3,value.getIntegrate_item_code());
            insertStatement.setString(4,value.getFee());
            insertStatement.setString(5,value.getCity_code());
            counter.inc(1);
            insertStatement.execute();
         
        }catch (Exception e){      
            LOG.info("invoke  --- " + connection);
            LOG.error(e.getMessage());
            throw new Exception(e);
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        assert insertStatement != null;
        assert connection != null;
        insertStatement.close();
        connection.close();
    }
}

执行的时候程序会卡在 Class.forName("org.apache.hive.jdbc.HiveDriver"); 或者 Class.forName("com.mysql.jdbc.Driver"); 这里


[hidden email]
 
发件人: JasonLee
发送时间: 2020-07-08 18:46
收件人: user-zh
主题: 回复:关于 richfunction中初始化数据库连接的问题
hi
具体是卡在什么地方了呢?可以打印日志定位一下 理论上是不会有这样的问题 还有单个执行的话可以吗?
 
 
| |
JasonLee
|
|
邮箱:[hidden email]
|
 
Signature is customized by Netease Mail Master
 
在2020年07月08日 18:32,[hidden email] 写道:
您好:
       我使用flink1.10.1版本streamapi编写程序时,在不同的richfunction中 分别使用Class.forName("*****"); 来加载数据库驱动。是不同的两个数据库驱动;这样会导致程序卡住不往下执行;有人遇到吗?
 
 
[hidden email]
Reply | Threaded
Open this post in threaded view
|

回复: 回复:关于 richfunction中初始化数据库连接的问题

Yichao Yang
Hi,


可以先写单元测试看下是否能同时加载两个数据库,先排除数据库连接本身的问题。


Best,
Yichao Yang




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"[hidden email]"<[hidden email]&gt;;
发送时间:&nbsp;2020年7月9日(星期四) 中午11:31
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;回复: 回复:关于 richfunction中初始化数据库连接的问题




FlinkKafkaConsumer<Bill&gt; kafkaConsumer = new FlinkKafkaConsumer<&gt;(TrafficConstants.BILLTOPIC,new SchargeConsumerSchema(), props);
kafkaConsumer.setStartFromLatest();
SingleOutputStreamOperator<Bill&gt; process = env.addSource(kafkaConsumer).setParallelism(4)
.filter(new HiveFilterFunction(TrafficConstants.HIVEURL, TrafficConstants.HIVEUSERNAME, TrafficConstants.HIVEPASSWORD)).name("流量费过滤")
.keyBy((KeySelector<Bill, String&gt;) value -&gt; value.getUser_id() + value.getSerial_number() + value.getProvince_code())
.process(***);
SingleOutputStreamOperator<BillInfo&gt; map = process.map();
map.addSink(new RdsFlowSink(TrafficConstants.URL, TrafficConstants.USERNAME, TrafficConstants.PASSWORD))
.setParallelism(1).name("sinkRds");

这是主要逻辑:Kafka取数--&gt;自定义richfilter函数加载hive维表数据来过滤数据--&gt;keyby--&gt;process--&gt;自定义sink函数

public class HiveFilterFunction extends RichFilterFunction<Bill&gt; {
&nbsp;&nbsp;&nbsp; Logger LOG = LoggerFactory.getLogger(HiveFilterFunction.class);
&nbsp;&nbsp;&nbsp; private final String jdbcUrl;
&nbsp;&nbsp;&nbsp; private final String username;
&nbsp;&nbsp;&nbsp; private final String password;
&nbsp;&nbsp;&nbsp; private transient volatile Statement sts;
&nbsp;&nbsp;&nbsp; private transient volatile Connection connection;
&nbsp;&nbsp;&nbsp; Map<String, String&gt; map = new ConcurrentHashMap();

&nbsp;&nbsp;&nbsp; public HiveFilterFunction(String jdbcUrl, String username, String password) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; this.jdbcUrl = jdbcUrl;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; this.username = username;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; this.password = password;
&nbsp;&nbsp;&nbsp; }

&nbsp;&nbsp;&nbsp; @Override
&nbsp;&nbsp;&nbsp; public void open(Configuration parameters) throws Exception {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; super.open(parameters);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Class.forName("org.apache.hive.jdbc.HiveDriver");
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; connection = DriverManager.getConnection(jdbcUrl, username, password);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; LOG.info("hive connection --- " + connection);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; sts = connection.createStatement();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; query();
&nbsp;&nbsp;&nbsp; }

&nbsp;&nbsp;&nbsp; @Override
&nbsp;&nbsp;&nbsp; public boolean filter(Bill value) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return map.containsKey(value.getIntegrate_item_code())
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; &amp;&amp; TrafficConstants.getProCode().contains(value.getProvince_code());
&nbsp;&nbsp;&nbsp; }

&nbsp;&nbsp;&nbsp; @Override
&nbsp;&nbsp;&nbsp; public void close() throws Exception {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; super.close();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; assert null != sts ;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; assert null != connection ;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; sts.close();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; connection.close();
&nbsp;&nbsp;&nbsp; }

&nbsp;&nbsp;&nbsp; private void query() throws Exception {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ResultSet resultSet = null;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; try {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; sts.execute(TrafficConstants.SETSQL);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; resultSet = sts.executeQuery(TrafficConstants.CODESQL);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; while (resultSet.next()) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; map.put(resultSet.getString("charge_code_cbss"), "");
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } catch (Exception e) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; LOG.error("hive error", e);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; throw new Exception(e);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } finally {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; assert resultSet != null;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; resultSet.close();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; LOG.info("hive 维表数据加载完成");
&nbsp;&nbsp;&nbsp; }
}

public class RdsFlowSink extends RichSinkFunction<BillInfo&gt;{
&nbsp;&nbsp;&nbsp; Logger LOG = LoggerFactory.getLogger(RdsFlowSink.class);
&nbsp;&nbsp;&nbsp; private final String url;
&nbsp;&nbsp;&nbsp; private final String name;
&nbsp;&nbsp;&nbsp; private final String password;

&nbsp;&nbsp;&nbsp; private transient volatile PreparedStatement insertStatement;
&nbsp;&nbsp;&nbsp; private transient volatile Connection connection;
&nbsp;&nbsp;&nbsp; private transient volatile Counter counter = null;
&nbsp;
&nbsp;&nbsp;&nbsp; public RdsFlowSink(String url, String name, String password) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; this.url = url;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; this.name = name;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; this.password = password;
&nbsp;&nbsp;&nbsp; }

&nbsp;&nbsp;&nbsp; @Override
&nbsp;&nbsp;&nbsp; public void open(Configuration parameters) throws Exception {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Class.forName("com.mysql.jdbc.Driver");
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; connection = DriverManager.getConnection(url,name,password);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; LOG.info("connection --- " + connection);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; counter = getRuntimeContext().getMetricGroup().counter("counter");
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; insertStatement = connection.prepareStatement(TrafficConstants.FLOWSQL);
&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp; }

&nbsp;&nbsp;&nbsp; @Override
&nbsp;&nbsp;&nbsp; public void invoke(BillInfo value, Context context) throws Exception {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; try {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; insertStatement.setString(1,value.getSerial_number());
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; insertStatement.setString(2,value.getUser_id());
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; insertStatement.setString(3,value.getIntegrate_item_code());
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; insertStatement.setString(4,value.getFee());
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; insertStatement.setString(5,value.getCity_code());
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; counter.inc(1);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; insertStatement.execute();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }catch (Exception e){&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; LOG.info("invoke&nbsp; --- " + connection);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; LOG.error(e.getMessage());
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; throw new Exception(e);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&nbsp;&nbsp;&nbsp; }

&nbsp;&nbsp;&nbsp; @Override
&nbsp;&nbsp;&nbsp; public void close() throws Exception {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; super.close();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; assert insertStatement != null;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; assert connection != null;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; insertStatement.close();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; connection.close();
&nbsp;&nbsp;&nbsp; }
}

执行的时候程序会卡在 Class.forName("org.apache.hive.jdbc.HiveDriver"); 或者 Class.forName("com.mysql.jdbc.Driver"); 这里


[hidden email]
&nbsp;
发件人: JasonLee
发送时间: 2020-07-08 18:46
收件人: user-zh
主题: 回复:关于 richfunction中初始化数据库连接的问题
hi
具体是卡在什么地方了呢?可以打印日志定位一下 理论上是不会有这样的问题 还有单个执行的话可以吗?
&nbsp;
&nbsp;
| |
JasonLee
|
|
邮箱:[hidden email]
|
&nbsp;
Signature is customized by Netease Mail Master
&nbsp;
在2020年07月08日 18:32,[hidden email] 写道:
您好:
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 我使用flink1.10.1版本streamapi编写程序时,在不同的richfunction中 分别使用Class.forName("*****"); 来加载数据库驱动。是不同的两个数据库驱动;这样会导致程序卡住不往下执行;有人遇到吗?
&nbsp;
&nbsp;
[hidden email]
Reply | Threaded
Open this post in threaded view
|

回复: 回复:关于 richfunction中初始化数据库连接的问题,导致程序卡主不执行

1101300123
我測試發現,但flink的算子发送并行度变化时 会出现问题,如果我的并行度一致则没有问题!测试代码如下;(最近比较慢,没回复)
整体的并行度为1,其中map后改变并行度为2, 程序就会卡主不执行;
(我的实际项目过滤后再计算出的数据很少所以sink的并行度是1)


import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class TimerMain4 {
    public static void main(String[] args) throws Exception {
        Logger LOG = LoggerFactory.getLogger(TimerMain4.class);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        env.addSource(new MySourceTuple())
                .filter(new RichFilterFunction<Tuple2<String, Long>>() {
                    private transient volatile Statement sts1;
                    private transient volatile Connection conn1;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        Class.forName("org.apache.hive.jdbc.HiveDriver");
                        conn1 = DriverManager.getConnection("", "", "");
                        LOG.info("connection --- " + conn1);
                        sts1 = conn1.createStatement();
                    }

                    @Override
                    public boolean filter(Tuple2<String, Long> value) {
                        return true;
                    }

                    @Override
                    public void close() throws Exception {
                        super.close();
                        sts1.close();
                        conn1.close();
                    }
                })
                .map(Tuple2::toString)
                .setParallelism(1)
                .addSink(new RichSinkFunction<String>() {
                    private transient volatile PreparedStatement sts2;
                    private transient volatile Connection conn2;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        Class.forName("com.mysql.jdbc.Driver");
                        conn2 = DriverManager.getConnection("", "", "");
                        LOG.info("connection --- " + conn2);
                        sts2 = conn2.prepareStatement("");
                    }

                    @Override
                    public void close() throws Exception {
                        super.close();
                        sts2.close();
                        conn2.close();
                    }

                    @Override
                    public void invoke(String value, Context context) {
                        LOG.info(value);
                    }
                }).setParallelism(1);
        env.execute();
    }

}

class MySourceTuple implements SourceFunction<Tuple2<String, Long>> {

    private Boolean isRunning = true;
    List<String> names = new ArrayList();

    private final Random random = new Random();
    Long number = 1L;

    @Override
    public void run(SourceContext<Tuple2<String, Long>> ctx) throws Exception {
        names.add("张");
        names.add("王");
        names.add("李");
        names.add("赵");
        while (isRunning) {
            int index = random.nextInt(4);
            ctx.collect(new Tuple2<>(names.get(index), number));
            number += 1;
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}




[hidden email]
 
发件人: Yichao Yang
发送时间: 2020-07-09 13:11
收件人: user-zh
主题: 回复: 回复:关于 richfunction中初始化数据库连接的问题
Hi,
 
 
可以先写单元测试看下是否能同时加载两个数据库,先排除数据库连接本身的问题。
 
 
Best,
Yichao Yang
 
 
 
 
------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"[hidden email]"<[hidden email]&gt;;
发送时间:&nbsp;2020年7月9日(星期四) 中午11:31
收件人:&nbsp;"user-zh"<[hidden email]&gt;;
 
主题:&nbsp;回复: 回复:关于 richfunction中初始化数据库连接的问题
 
 
 
 
FlinkKafkaConsumer<Bill&gt; kafkaConsumer = new FlinkKafkaConsumer<&gt;(TrafficConstants.BILLTOPIC,new SchargeConsumerSchema(), props);
kafkaConsumer.setStartFromLatest();
SingleOutputStreamOperator<Bill&gt; process = env.addSource(kafkaConsumer).setParallelism(4)
.filter(new HiveFilterFunction(TrafficConstants.HIVEURL, TrafficConstants.HIVEUSERNAME, TrafficConstants.HIVEPASSWORD)).name("流量费过滤")
.keyBy((KeySelector<Bill, String&gt;) value -&gt; value.getUser_id() + value.getSerial_number() + value.getProvince_code())
.process(***);
SingleOutputStreamOperator<BillInfo&gt; map = process.map();
map.addSink(new RdsFlowSink(TrafficConstants.URL, TrafficConstants.USERNAME, TrafficConstants.PASSWORD))
.setParallelism(1).name("sinkRds");
 
这是主要逻辑:Kafka取数--&gt;自定义richfilter函数加载hive维表数据来过滤数据--&gt;keyby--&gt;process--&gt;自定义sink函数
 
public class HiveFilterFunction extends RichFilterFunction<Bill&gt; {
&nbsp;&nbsp;&nbsp; Logger LOG = LoggerFactory.getLogger(HiveFilterFunction.class);
&nbsp;&nbsp;&nbsp; private final String jdbcUrl;
&nbsp;&nbsp;&nbsp; private final String username;
&nbsp;&nbsp;&nbsp; private final String password;
&nbsp;&nbsp;&nbsp; private transient volatile Statement sts;
&nbsp;&nbsp;&nbsp; private transient volatile Connection connection;
&nbsp;&nbsp;&nbsp; Map<String, String&gt; map = new ConcurrentHashMap();
 
&nbsp;&nbsp;&nbsp; public HiveFilterFunction(String jdbcUrl, String username, String password) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; this.jdbcUrl = jdbcUrl;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; this.username = username;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; this.password = password;
&nbsp;&nbsp;&nbsp; }
 
&nbsp;&nbsp;&nbsp; @Override
&nbsp;&nbsp;&nbsp; public void open(Configuration parameters) throws Exception {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; super.open(parameters);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Class.forName("org.apache.hive.jdbc.HiveDriver");
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; connection = DriverManager.getConnection(jdbcUrl, username, password);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; LOG.info("hive connection --- " + connection);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; sts = connection.createStatement();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; query();
&nbsp;&nbsp;&nbsp; }
 
&nbsp;&nbsp;&nbsp; @Override
&nbsp;&nbsp;&nbsp; public boolean filter(Bill value) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return map.containsKey(value.getIntegrate_item_code())
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; &amp;&amp; TrafficConstants.getProCode().contains(value.getProvince_code());
&nbsp;&nbsp;&nbsp; }
 
&nbsp;&nbsp;&nbsp; @Override
&nbsp;&nbsp;&nbsp; public void close() throws Exception {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; super.close();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; assert null != sts ;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; assert null != connection ;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; sts.close();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; connection.close();
&nbsp;&nbsp;&nbsp; }
 
&nbsp;&nbsp;&nbsp; private void query() throws Exception {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ResultSet resultSet = null;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; try {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; sts.execute(TrafficConstants.SETSQL);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; resultSet = sts.executeQuery(TrafficConstants.CODESQL);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; while (resultSet.next()) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; map.put(resultSet.getString("charge_code_cbss"), "");
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } catch (Exception e) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; LOG.error("hive error", e);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; throw new Exception(e);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } finally {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; assert resultSet != null;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; resultSet.close();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; LOG.info("hive 维表数据加载完成");
&nbsp;&nbsp;&nbsp; }
}
 
public class RdsFlowSink extends RichSinkFunction<BillInfo&gt;{
&nbsp;&nbsp;&nbsp; Logger LOG = LoggerFactory.getLogger(RdsFlowSink.class);
&nbsp;&nbsp;&nbsp; private final String url;
&nbsp;&nbsp;&nbsp; private final String name;
&nbsp;&nbsp;&nbsp; private final String password;
 
&nbsp;&nbsp;&nbsp; private transient volatile PreparedStatement insertStatement;
&nbsp;&nbsp;&nbsp; private transient volatile Connection connection;
&nbsp;&nbsp;&nbsp; private transient volatile Counter counter = null;
&nbsp;
&nbsp;&nbsp;&nbsp; public RdsFlowSink(String url, String name, String password) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; this.url = url;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; this.name = name;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; this.password = password;
&nbsp;&nbsp;&nbsp; }
 
&nbsp;&nbsp;&nbsp; @Override
&nbsp;&nbsp;&nbsp; public void open(Configuration parameters) throws Exception {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Class.forName("com.mysql.jdbc.Driver");
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; connection = DriverManager.getConnection(url,name,password);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; LOG.info("connection --- " + connection);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; counter = getRuntimeContext().getMetricGroup().counter("counter");
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; insertStatement = connection.prepareStatement(TrafficConstants.FLOWSQL);
&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp; }
 
&nbsp;&nbsp;&nbsp; @Override
&nbsp;&nbsp;&nbsp; public void invoke(BillInfo value, Context context) throws Exception {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; try {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; insertStatement.setString(1,value.getSerial_number());
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; insertStatement.setString(2,value.getUser_id());
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; insertStatement.setString(3,value.getIntegrate_item_code());
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; insertStatement.setString(4,value.getFee());
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; insertStatement.setString(5,value.getCity_code());
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; counter.inc(1);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; insertStatement.execute();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }catch (Exception e){&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; LOG.info("invoke&nbsp; --- " + connection);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; LOG.error(e.getMessage());
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; throw new Exception(e);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&nbsp;&nbsp;&nbsp; }
 
&nbsp;&nbsp;&nbsp; @Override
&nbsp;&nbsp;&nbsp; public void close() throws Exception {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; super.close();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; assert insertStatement != null;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; assert connection != null;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; insertStatement.close();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; connection.close();
&nbsp;&nbsp;&nbsp; }
}
 
执行的时候程序会卡在 Class.forName("org.apache.hive.jdbc.HiveDriver"); 或者 Class.forName("com.mysql.jdbc.Driver"); 这里
 
 
[hidden email]
&nbsp;
发件人: JasonLee
发送时间: 2020-07-08 18:46
收件人: user-zh
主题: 回复:关于 richfunction中初始化数据库连接的问题
hi
具体是卡在什么地方了呢?可以打印日志定位一下 理论上是不会有这样的问题 还有单个执行的话可以吗?
&nbsp;
&nbsp;
| |
JasonLee
|
|
邮箱:[hidden email]
|
&nbsp;
Signature is customized by Netease Mail Master
&nbsp;
在2020年07月08日 18:32,[hidden email] 写道:
您好:
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 我使用flink1.10.1版本streamapi编写程序时,在不同的richfunction中 分别使用Class.forName("*****"); 来加载数据库驱动。是不同的两个数据库驱动;这样会导致程序卡住不往下执行;有人遇到吗?
&nbsp;
&nbsp;
[hidden email]