请教下开发者:
从kafka读取数据转化为Pojo类型的数据流注册成表,新消息来了,注册表的查询结果不打印 是不支持pojo格式流注册成表吗?只能是Row格式吗? 下面是代码 //1.创建执行环境 StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); //全局参数设置 streamEnv.getConfig().setGlobalJobParameters(parameters2); //table env StreamTableEnvironment tEnv = StreamTableEnvironment.create(streamEnv); //2.读取kafka | other source // DataStream<String> dataStream = null; // if ("kafka".equalsIgnoreCase(sourceType)) { //用jsonString反序列化 // dataStream = FlinkUtils.createKafkaStream(parameters2, SimpleStringSchema.class); // } //###############定义消费kafka source############## Properties props = new Properties(); //指定Ka fka的Broker地址 props.setProperty("bootstrap.servers", parameters2.getRequired("bootstrap.servers")); //指定组ID props.setProperty("group.id", parameters2.get("group.id")); //如果没有记录偏移量,第一次从最开始消费 // props.setProperty("auto.offset.reset", parameters.get("auto.offset.reset","earliest")); //kafka的消费者不自动提交偏移量 props.setProperty("enable.auto.commit", parameters2.get("enable.auto.commit","false")); List<String> topics = Arrays.asList(parameters2.get("topics").split(",")); //new KafkaSource instance FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>( topics, SimpleStringSchema.class.newInstance(), props); //得到kafka流 DataStreamSource<String> dataStream = streamEnv.addSource(kafkaConsumer); //3.映射为实体 SingleOutputStreamOperator map = dataStream.map(new Map2EntityFunction()).returns(Class.forName(sourceClass)); //4.注册一个实例获取column names Class<?> clz = Class.forName(sourceClass); Object vo = clz.newInstance(); StringBuilder columnBuilder = new StringBuilder(); Field[] declaredFields = vo.getClass().getDeclaredFields(); for (int i = 0; i < declaredFields.length; i++) { String fieldName = declaredFields[i].getName(); columnBuilder.append(fieldName); if (i < declaredFields.length - 1) { columnBuilder.append(","); } } String fieldsDeclare = columnBuilder.toString(); System.err.println(fieldsDeclare); //5.注册数据表 --注意! 【这里的表名和字段名需要和待处理的执行表达式对应上,对应不上查询会报错】 tEnv.registerDataStream(sourceName, map,fieldsDeclare); //6.执行语句 Table table = tEnv.sqlQuery(executiveSql); //7.print tEnv.toAppendStream(table, Row.class).print();//运行时这里不会打印出结果 //8.execute streamEnv.execute(jobName); ------------------------------------------------------------------------------------------------------------------------------- /** * * 根据传入的映射类返回一个通用的POJO流 */ public class Map2EntityFunction<T> extends RichMapFunction<String, T> { @Override public T map(String s) throws Exception { System.err.println("receive kafka msg--->"+s); //每次收到消息这里会打印 ParameterTool params = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); String sourceClass = params.getRequired("sourceClass"); Preconditions.checkNotNull(sourceClass); Class<T> clz = (Class<T>) Class.forName(sourceClass); return JsonUtil.json2object(s, clz); } } --------------------------------------------------------------------------------------------------------------------------------------- |
Free forum by Nabble | Edit this page |