从kafka读取数据转化为Pojo类型的数据流注册成表,新消息来了,注册表的查询结果不打印

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

从kafka读取数据转化为Pojo类型的数据流注册成表,新消息来了,注册表的查询结果不打印

yang zhang
请教下开发者:
从kafka读取数据转化为Pojo类型的数据流注册成表,新消息来了,注册表的查询结果不打印

是不支持pojo格式流注册成表吗?只能是Row格式吗?

下面是代码

        //1.创建执行环境
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

        //全局参数设置
        streamEnv.getConfig().setGlobalJobParameters(parameters2);

        //table env
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(streamEnv);


        //2.读取kafka | other source

// DataStream<String> dataStream = null;

// if ("kafka".equalsIgnoreCase(sourceType)) {

            //用jsonString反序列化

// dataStream = FlinkUtils.createKafkaStream(parameters2, SimpleStringSchema.class);

// }

        //###############定义消费kafka source##############
        Properties props = new Properties();
        //指定Ka fka的Broker地址
        props.setProperty("bootstrap.servers", parameters2.getRequired("bootstrap.servers"));
        //指定组ID
        props.setProperty("group.id", parameters2.get("group.id"));
        //如果没有记录偏移量,第一次从最开始消费
// props.setProperty("auto.offset.reset", parameters.get("auto.offset.reset","earliest"));
        //kafka的消费者不自动提交偏移量
        props.setProperty("enable.auto.commit", parameters2.get("enable.auto.commit","false"));

        List<String> topics = Arrays.asList(parameters2.get("topics").split(","));



        //new KafkaSource instance
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(
                topics,
                SimpleStringSchema.class.newInstance(),
                props);

        //得到kafka流
        DataStreamSource<String> dataStream = streamEnv.addSource(kafkaConsumer);

        //3.映射为实体
        SingleOutputStreamOperator map = dataStream.map(new Map2EntityFunction()).returns(Class.forName(sourceClass));

        //4.注册一个实例获取column names
        Class<?> clz = Class.forName(sourceClass);
        Object vo = clz.newInstance();
        StringBuilder columnBuilder = new StringBuilder();
        Field[] declaredFields = vo.getClass().getDeclaredFields();
        for (int i = 0; i < declaredFields.length; i++) {
            String fieldName = declaredFields[i].getName();
            columnBuilder.append(fieldName);
            if (i < declaredFields.length - 1) {
                columnBuilder.append(",");
            }
        }

        String fieldsDeclare = columnBuilder.toString();

        System.err.println(fieldsDeclare);

        //5.注册数据表 --注意! 【这里的表名和字段名需要和待处理的执行表达式对应上,对应不上查询会报错】
        tEnv.registerDataStream(sourceName, map,fieldsDeclare);

        //6.执行语句
        Table table = tEnv.sqlQuery(executiveSql);

        //7.print
        tEnv.toAppendStream(table, Row.class).print();//运行时这里不会打印出结果

   
        //8.execute
        streamEnv.execute(jobName);
 
 
 
 
-------------------------------------------------------------------------------------------------------------------------------
 
/**
 *
 * 根据传入的映射类返回一个通用的POJO流
 */
public class Map2EntityFunction<T> extends RichMapFunction<String, T> {


    @Override
    public T map(String s) throws Exception {
        System.err.println("receive kafka msg--->"+s); //每次收到消息这里会打印
        ParameterTool params = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        String sourceClass = params.getRequired("sourceClass");
        Preconditions.checkNotNull(sourceClass);
        Class<T> clz = (Class<T>) Class.forName(sourceClass);
        return JsonUtil.json2object(s, clz);
    }


}
 
---------------------------------------------------------------------------------------------------------------------------------------