从kafka读取数据转化为Pojo类型的数据流注册成表,新消息来了,注册表的查询结果不打印
请问是不支持pojo流注册表吗?只能是Row类型吗? 下面是相关代码 //1.创建执行环境 StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); //全局参数设置 streamEnv.getConfig().setGlobalJobParameters(parameters2); //table env StreamTableEnvironment tEnv = StreamTableEnvironment.create(streamEnv); //2.读取kafka | other source // DataStream<String> dataStream = null; // if ("kafka".equalsIgnoreCase(sourceType)) { //用jsonString反序列化 // dataStream = FlinkUtils.createKafkaStream(parameters2, SimpleStringSchema.class); // } //###############定义消费kafka source############## Properties props = new Properties(); //指定Ka fka的Broker地址 props.setProperty("bootstrap.servers", parameters2.getRequired("bootstrap.servers")); //指定组ID props.setProperty("group.id", parameters2.get("group.id")); //如果没有记录偏移量,第一次从最开始消费 // props.setProperty("auto.offset.reset", parameters.get("auto.offset.reset","earliest")); //kafka的消费者不自动提交偏移量 props.setProperty("enable.auto.commit", parameters2.get("enable.auto.commit","false")); List<String> topics = Arrays.asList(parameters2.get("topics").split(",")); //new KafkaSource instance FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>( topics, SimpleStringSchema.class.newInstance(), props); //得到kafka流 DataStreamSource<String> dataStream = streamEnv.addSource(kafkaConsumer); //3.映射为实体 SingleOutputStreamOperator map = dataStream.map(new Map2EntityFunction()).returns(Class.forName(sourceClass)); //4.注册一个实例获取column names Class<?> clz = Class.forName(sourceClass); Object vo = clz.newInstance(); StringBuilder columnBuilder = new StringBuilder(); Field[] declaredFields = vo.getClass().getDeclaredFields(); for (int i = 0; i < declaredFields.length; i++) { String fieldName = declaredFields[i].getName(); columnBuilder.append(fieldName); if (i < declaredFields.length - 1) { columnBuilder.append(","); } } String fieldsDeclare = columnBuilder.toString(); System.err.println(fieldsDeclare); //5.注册数据表 --注意! 【这里的表名和字段名需要和待处理的执行表达式对应上,对应不上查询会报错】 tEnv.registerDataStream(sourceName, map,fieldsDeclare); //6.执行语句 Table table = tEnv.sqlQuery(executiveSql); //7.print tEnv.toAppendStream(table, Row.class).print();//运行时这里不会打印出结果 //8.execute streamEnv.execute(jobName); ------------------------------------------------------------------------------------------------------------------------------- /** * * 根据传入的映射类返回一个通用的POJO流 */ public class Map2EntityFunction<T> extends RichMapFunction<String, T> { @Override public T map(String s) throws Exception { System.err.println("receive kafka msg--->"+s); //每次收到消息这里会打印 ParameterTool params = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); String sourceClass = params.getRequired("sourceClass"); Preconditions.checkNotNull(sourceClass); Class<T> clz = (Class<T>) Class.forName(sourceClass); return JsonUtil.json2object(s, clz); } } --------------------------------------------------------------------------------------------------------------------------------------- |
我知道了,我的查询sql条件的问题,已经改好了。
谢谢 发自我的iPhone > 在 2020年8月25日,16:12,yang zhang <[hidden email]> 写道: > > 从kafka读取数据转化为Pojo类型的数据流注册成表,新消息来了,注册表的查询结果不打印 > > 请问是不支持pojo流注册表吗?只能是Row类型吗? > > 下面是相关代码 > > > > //1.创建执行环境 > StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); > > //全局参数设置 > streamEnv.getConfig().setGlobalJobParameters(parameters2); > > //table env > StreamTableEnvironment tEnv = StreamTableEnvironment.create(streamEnv); > > > //2.读取kafka | other source > > // DataStream<String> dataStream = null; > > // if ("kafka".equalsIgnoreCase(sourceType)) { > > //用jsonString反序列化 > > // dataStream = FlinkUtils.createKafkaStream(parameters2, SimpleStringSchema.class); > > // } > > //###############定义消费kafka source############## > Properties props = new Properties(); > //指定Ka fka的Broker地址 > props.setProperty("bootstrap.servers", parameters2.getRequired("bootstrap.servers")); > //指定组ID > props.setProperty("group.id", parameters2.get("group.id")); > //如果没有记录偏移量,第一次从最开始消费 > // props.setProperty("auto.offset.reset", parameters.get("auto.offset.reset","earliest")); > //kafka的消费者不自动提交偏移量 > props.setProperty("enable.auto.commit", parameters2.get("enable.auto.commit","false")); > > List<String> topics = Arrays.asList(parameters2.get("topics").split(",")); > > > > //new KafkaSource instance > FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>( > topics, > SimpleStringSchema.class.newInstance(), > props); > > //得到kafka流 > DataStreamSource<String> dataStream = streamEnv.addSource(kafkaConsumer); > > //3.映射为实体 > SingleOutputStreamOperator map = dataStream.map(new Map2EntityFunction()).returns(Class.forName(sourceClass)); > > //4.注册一个实例获取column names > Class<?> clz = Class.forName(sourceClass); > Object vo = clz.newInstance(); > StringBuilder columnBuilder = new StringBuilder(); > Field[] declaredFields = vo.getClass().getDeclaredFields(); > for (int i = 0; i < declaredFields.length; i++) { > String fieldName = declaredFields[i].getName(); > columnBuilder.append(fieldName); > if (i < declaredFields.length - 1) { > columnBuilder.append(","); > } > } > > String fieldsDeclare = columnBuilder.toString(); > > System.err.println(fieldsDeclare); > > //5.注册数据表 --注意! 【这里的表名和字段名需要和待处理的执行表达式对应上,对应不上查询会报错】 > tEnv.registerDataStream(sourceName, map,fieldsDeclare); > > //6.执行语句 > Table table = tEnv.sqlQuery(executiveSql); > > //7.print > tEnv.toAppendStream(table, Row.class).print();//运行时这里不会打印出结果 > > > //8.execute > streamEnv.execute(jobName); > > > > > ------------------------------------------------------------------------------------------------------------------------------- > > /** > * > * 根据传入的映射类返回一个通用的POJO流 > */ > public class Map2EntityFunction<T> extends RichMapFunction<String, T> { > > > @Override > public T map(String s) throws Exception { > System.err.println("receive kafka msg--->"+s); //每次收到消息这里会打印 > ParameterTool params = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); > String sourceClass = params.getRequired("sourceClass"); > Preconditions.checkNotNull(sourceClass); > Class<T> clz = (Class<T>) Class.forName(sourceClass); > return JsonUtil.json2object(s, clz); > } > > > } > > --------------------------------------------------------------------------------------------------------------------------------------- |
Free forum by Nabble | Edit this page |