代码:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.BATCH) // 在DataStream
API上以批处理方式执行
// 本地测试文件
val inputStream =
env.readTextFile(getClass.getResource("/hello.txt").getPath)
// 分词统计,问题:批处理模式的时候,sum 为 1 的单词不会被打印
val resultStream = inputStream
.flatMap(_.split(","))
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(_._1)
.sum(1)
resultStream.print()
env.execute("word count")
测试文件的数据内容:
hello,flink
hello,flink
hello,hive
hello,hive
hello,hbase
hello,hbase
hello,scala
hello,kafka
hello,kafka
测试结果:hello/flink/hive/hbase/kafka的和大于1,会打印出来;但是 scala的个数为1,不会被打印出来
--
Sent from:
http://apache-flink.147419.n8.nabble.com/