从kafka读取数据转化为Pojo类型的数据流注册成表,新消息来了,注册表的查询结果不打印

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

从kafka读取数据转化为Pojo类型的数据流注册成表,新消息来了,注册表的查询结果不打印

yang zhang
从kafka读取数据转化为Pojo类型的数据流注册成表,新消息来了,注册表的查询结果不打印

请问是不支持pojo流注册表吗?只能是Row类型吗?

下面是相关代码



        //1.创建执行环境
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

        //全局参数设置
        streamEnv.getConfig().setGlobalJobParameters(parameters2);

        //table env
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(streamEnv);


        //2.读取kafka | other source

//        DataStream<String> dataStream = null;

//        if ("kafka".equalsIgnoreCase(sourceType)) {

            //用jsonString反序列化

//            dataStream = FlinkUtils.createKafkaStream(parameters2, SimpleStringSchema.class);

//        }

        //###############定义消费kafka source##############
        Properties props = new Properties();
        //指定Ka fka的Broker地址
        props.setProperty("bootstrap.servers", parameters2.getRequired("bootstrap.servers"));
        //指定组ID
        props.setProperty("group.id", parameters2.get("group.id"));
        //如果没有记录偏移量,第一次从最开始消费
//        props.setProperty("auto.offset.reset", parameters.get("auto.offset.reset","earliest"));
        //kafka的消费者不自动提交偏移量
        props.setProperty("enable.auto.commit", parameters2.get("enable.auto.commit","false"));

        List<String> topics = Arrays.asList(parameters2.get("topics").split(","));



        //new KafkaSource instance
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(
                topics,
                SimpleStringSchema.class.newInstance(),
                props);

        //得到kafka流
        DataStreamSource<String> dataStream = streamEnv.addSource(kafkaConsumer);

        //3.映射为实体
        SingleOutputStreamOperator map = dataStream.map(new Map2EntityFunction()).returns(Class.forName(sourceClass));

        //4.注册一个实例获取column names
        Class<?> clz = Class.forName(sourceClass);
        Object vo = clz.newInstance();
        StringBuilder columnBuilder = new StringBuilder();
        Field[] declaredFields = vo.getClass().getDeclaredFields();
        for (int i = 0; i < declaredFields.length; i++) {
            String fieldName = declaredFields[i].getName();
            columnBuilder.append(fieldName);
            if (i < declaredFields.length - 1) {
                columnBuilder.append(",");
            }
        }

        String fieldsDeclare = columnBuilder.toString();

        System.err.println(fieldsDeclare);

        //5.注册数据表 --注意! 【这里的表名和字段名需要和待处理的执行表达式对应上,对应不上查询会报错】
        tEnv.registerDataStream(sourceName, map,fieldsDeclare);

        //6.执行语句
        Table table = tEnv.sqlQuery(executiveSql);

        //7.print
        tEnv.toAppendStream(table, Row.class).print();//运行时这里不会打印出结果

   
        //8.execute
        streamEnv.execute(jobName);
               
               
               
               
-------------------------------------------------------------------------------------------------------------------------------
               
/**
 *
 * 根据传入的映射类返回一个通用的POJO流
 */
public class Map2EntityFunction<T> extends RichMapFunction<String, T> {


    @Override
    public T map(String s) throws Exception {
        System.err.println("receive kafka msg--->"+s); //每次收到消息这里会打印
        ParameterTool params = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        String sourceClass = params.getRequired("sourceClass");
        Preconditions.checkNotNull(sourceClass);
        Class<T> clz = (Class<T>) Class.forName(sourceClass);
        return JsonUtil.json2object(s, clz);
    }


}
               
---------------------------------------------------------------------------------------------------------------------------------------
Reply | Threaded
Open this post in threaded view
|

Re: 从kafka读取数据转化为Pojo类型的数据流注册成表,新消息来了,注册表的查询结果不打印

yang zhang
我知道了,我的查询sql条件的问题,已经改好了。
谢谢

发自我的iPhone

> 在 2020年8月25日,16:12,yang zhang <[hidden email]> 写道:
>
> 从kafka读取数据转化为Pojo类型的数据流注册成表,新消息来了,注册表的查询结果不打印
>
> 请问是不支持pojo流注册表吗?只能是Row类型吗?
>
> 下面是相关代码
>
>
>
>        //1.创建执行环境
>        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>
>        //全局参数设置
>        streamEnv.getConfig().setGlobalJobParameters(parameters2);
>
>        //table env
>        StreamTableEnvironment tEnv = StreamTableEnvironment.create(streamEnv);
>
>
>        //2.读取kafka | other source
>
> //        DataStream<String> dataStream = null;
>
> //        if ("kafka".equalsIgnoreCase(sourceType)) {
>
>            //用jsonString反序列化
>
> //            dataStream = FlinkUtils.createKafkaStream(parameters2, SimpleStringSchema.class);
>
> //        }
>
>        //###############定义消费kafka source##############
>        Properties props = new Properties();
>        //指定Ka fka的Broker地址
>        props.setProperty("bootstrap.servers", parameters2.getRequired("bootstrap.servers"));
>        //指定组ID
>        props.setProperty("group.id", parameters2.get("group.id"));
>        //如果没有记录偏移量,第一次从最开始消费
> //        props.setProperty("auto.offset.reset", parameters.get("auto.offset.reset","earliest"));
>        //kafka的消费者不自动提交偏移量
>        props.setProperty("enable.auto.commit", parameters2.get("enable.auto.commit","false"));
>
>        List<String> topics = Arrays.asList(parameters2.get("topics").split(","));
>
>
>
>        //new KafkaSource instance
>        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(
>                topics,
>                SimpleStringSchema.class.newInstance(),
>                props);
>
>        //得到kafka流
>        DataStreamSource<String> dataStream = streamEnv.addSource(kafkaConsumer);
>
>        //3.映射为实体
>        SingleOutputStreamOperator map = dataStream.map(new Map2EntityFunction()).returns(Class.forName(sourceClass));
>
>        //4.注册一个实例获取column names
>        Class<?> clz = Class.forName(sourceClass);
>        Object vo = clz.newInstance();
>        StringBuilder columnBuilder = new StringBuilder();
>        Field[] declaredFields = vo.getClass().getDeclaredFields();
>        for (int i = 0; i < declaredFields.length; i++) {
>            String fieldName = declaredFields[i].getName();
>            columnBuilder.append(fieldName);
>            if (i < declaredFields.length - 1) {
>                columnBuilder.append(",");
>            }
>        }
>
>        String fieldsDeclare = columnBuilder.toString();
>
>        System.err.println(fieldsDeclare);
>
>        //5.注册数据表 --注意! 【这里的表名和字段名需要和待处理的执行表达式对应上,对应不上查询会报错】
>        tEnv.registerDataStream(sourceName, map,fieldsDeclare);
>
>        //6.执行语句
>        Table table = tEnv.sqlQuery(executiveSql);
>
>        //7.print
>        tEnv.toAppendStream(table, Row.class).print();//运行时这里不会打印出结果
>
>
>        //8.execute
>        streamEnv.execute(jobName);
>        
>        
>        
>        
> -------------------------------------------------------------------------------------------------------------------------------
>        
> /**
> *
> * 根据传入的映射类返回一个通用的POJO流
> */
> public class Map2EntityFunction<T> extends RichMapFunction<String, T> {
>
>
>    @Override
>    public T map(String s) throws Exception {
>        System.err.println("receive kafka msg--->"+s); //每次收到消息这里会打印
>        ParameterTool params = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>        String sourceClass = params.getRequired("sourceClass");
>        Preconditions.checkNotNull(sourceClass);
>        Class<T> clz = (Class<T>) Class.forName(sourceClass);
>        return JsonUtil.json2object(s, clz);
>    }
>
>
> }
>        
> ---------------------------------------------------------------------------------------------------------------------------------------