flink消费Kafka没有数据问题

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

flink消费Kafka没有数据问题

sunfulin
我有一个作业,通过Flink去消费神策的Kafka消息数据,参见(https://manual.sensorsdata.cn/sa/latest/page-1573828.html)。但是任务启动之后,Flink任务没有消费任何Kafka消息,我通过神策的kafka console consumer能看到源源不断的数据。
本身flink作业job里没有任何报错,想问下大家这种情况该如何排查?
Reply | Threaded
Open this post in threaded view
|

回复:flink消费Kafka没有数据问题

Evan
第一,查看神策的kafka的配置项advertised.host.name




------------------ 原始邮件 ------------------
发件人:&nbsp;"sunfulin"<[hidden email]&gt;;
发送时间:&nbsp;2020年1月10日(星期五) 上午9:51
收件人:&nbsp;"[hidden email]"<[hidden email]&gt;;

主题:&nbsp;flink消费Kafka没有数据问题



我有一个作业,通过Flink去消费神策的Kafka消息数据,参见(https://manual.sensorsdata.cn/sa/latest/page-1573828.html)。但是任务启动之后,Flink任务没有消费任何Kafka消息,我通过神策的kafka console consumer能看到源源不断的数据。
本身flink作业job里没有任何报错,想问下大家这种情况该如何排查?
Reply | Threaded
Open this post in threaded view
|

回复:flink消费Kafka没有数据问题

Evan
In reply to this post by sunfulin
第一 查看神策kafka的配置advertised.host.name和advertised.port配置
第二 网络配置就按照如下:
启动订阅的机器需与部署神策分析的机器在同一个内网,且必须可以解析神策分析服务器的 host;
请选用兼容的 Kafka 客户端版本,高版本服务端兼容低版本客户端,反之则可能存在兼容性问题。神策 Kafka 服务端版本一部分早期用户为 0.8.2.x,后期新用户为 0.10.x,具体情况可在服务器上查看;
仅私有部署版支持通过 Kafka 订阅数据;
如果在自己的机器上订阅数据,需要先在自己机器上 hosts 中配上神策的服务器,订阅的时候 zookeeper 填写所有机器的 hostname,例如:hostname1:2181,hostname2:2181,hostname3:2181

第三、去神策的kafka下查看一下日志,看有没有外部消费者连接报错信息
第四、有报错信息就解决一下


------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"sunfulin"<[hidden email]&gt;;
发送时间:&nbsp;2020年1月10日(星期五) 上午9:51
收件人:&nbsp;"[hidden email]"<[hidden email]&gt;;

主题:&nbsp;flink消费Kafka没有数据问题



我有一个作业,通过Flink去消费神策的Kafka消息数据,参见(https://manual.sensorsdata.cn/sa/latest/page-1573828.html)。但是任务启动之后,Flink任务没有消费任何Kafka消息,我通过神策的kafka console consumer能看到源源不断的数据。
本身flink作业job里没有任何报错,想问下大家这种情况该如何排查?
Reply | Threaded
Open this post in threaded view
|

回复:flink消费Kafka没有数据问题

ZhangChangjun
In reply to this post by sunfulin
可以看下source的并行度是否大于kafka分区数量



---原始邮件---
发件人: "sunfulin"<[hidden email]&gt;
发送时间: 2020年1月10日(周五) 上午9:51
收件人: "[hidden email]"<[hidden email]&gt;;
主题: flink消费Kafka没有数据问题


我有一个作业,通过Flink去消费神策的Kafka消息数据,参见(https://manual.sensorsdata.cn/sa/latest/page-1573828.html)。但是任务启动之后,Flink任务没有消费任何Kafka消息,我通过神策的kafka console consumer能看到源源不断的数据。
本身flink作业job里没有任何报错,想问下大家这种情况该如何排查?
Reply | Threaded
Open this post in threaded view
|

Re:回复:flink消费Kafka没有数据问题

sunfulin
In reply to this post by Evan
感谢回复,排查后确实是hostname的配置问题。


任务还遇到了另外一个问题。下面是读取的Kafka连接配置,使用JSON SCHEMA来解析。不过实际运行时却抛出了如下异常,请问有大神知道是啥原因么?

Caused by: java.lang.NullPointerException: Null result cannot be used for atomic types.

 at DataStreamSinkConversion$5.map(Unknown Source)

 at org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:55)

 at org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:34)

 at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)

 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)

 at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

 at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)

 at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)

 at DataStreamSourceConversion$2.processElement(Unknown Source)

 at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)

 at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)

 at org.apache.flink.streaming.






tableEnv.connect(

    new Kafka()

                .version(kafkaInstance.getVersion())

                .topic(chooseKafkaTopic(initPack.clusterMode))

                .property("bootstrap.servers", kafkaInstance.getBrokerList())

                .property("group.id", initPack.jobName)

                .startFromEarliest()   // 测试用,上生产可以去掉

).withSchema(

    new Schema()

            // 时间戳字段

            .field("rowtime", Types.SQL_TIMESTAMP).rowtime(

                new Rowtime()

                    .timestampsFromField("time")

                    .watermarksPeriodicBounded(1000)

            )

            .field("type", Types.STRING)

            .field("event", Types.STRING)

            .field("user_id", Types.STRING)

            .field("distinct_id", Types.STRING)

            .field("project", Types.STRING)

            .field("recv_time", Types.SQL_TIMESTAMP)

            .field("properties", Types.ROW_NAMED(

                    new String[] { "BROWSER_VERSION", "pathname", "search", "eventType", "message", "stack", "componentStack" },

                    Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING)

            )

).withFormat(

    new Json().failOnMissingField(false)

            .deriveSchema()

)

.inAppendMode()

.registerTableSource(getTableName());












在 2020-01-10 09:53:52,"Evan" <[hidden email]> 写道:

>第一,查看神策的kafka的配置项advertised.host.name
>
>
>
>
>------------------&nbsp;原始邮件&nbsp;------------------
>发件人:&nbsp;"sunfulin"<[hidden email]&gt;;
>发送时间:&nbsp;2020年1月10日(星期五) 上午9:51
>收件人:&nbsp;"[hidden email]"<[hidden email]&gt;;
>
>主题:&nbsp;flink消费Kafka没有数据问题
>
>
>
>我有一个作业,通过Flink去消费神策的Kafka消息数据,参见(https://manual.sensorsdata.cn/sa/latest/page-1573828.html)。但是任务启动之后,Flink任务没有消费任何Kafka消息,我通过神策的kafka console consumer能看到源源不断的数据。
>本身flink作业job里没有任何报错,想问下大家这种情况该如何排查?