我有一个作业,通过Flink去消费神策的Kafka消息数据,参见(https://manual.sensorsdata.cn/sa/latest/page-1573828.html)。但是任务启动之后,Flink任务没有消费任何Kafka消息,我通过神策的kafka console consumer能看到源源不断的数据。
本身flink作业job里没有任何报错,想问下大家这种情况该如何排查? |
第一,查看神策的kafka的配置项advertised.host.name
------------------ 原始邮件 ------------------ 发件人: "sunfulin"<[hidden email]>; 发送时间: 2020年1月10日(星期五) 上午9:51 收件人: "[hidden email]"<[hidden email]>; 主题: flink消费Kafka没有数据问题 我有一个作业,通过Flink去消费神策的Kafka消息数据,参见(https://manual.sensorsdata.cn/sa/latest/page-1573828.html)。但是任务启动之后,Flink任务没有消费任何Kafka消息,我通过神策的kafka console consumer能看到源源不断的数据。 本身flink作业job里没有任何报错,想问下大家这种情况该如何排查? |
In reply to this post by sunfulin
第一 查看神策kafka的配置advertised.host.name和advertised.port配置
第二 网络配置就按照如下: 启动订阅的机器需与部署神策分析的机器在同一个内网,且必须可以解析神策分析服务器的 host; 请选用兼容的 Kafka 客户端版本,高版本服务端兼容低版本客户端,反之则可能存在兼容性问题。神策 Kafka 服务端版本一部分早期用户为 0.8.2.x,后期新用户为 0.10.x,具体情况可在服务器上查看; 仅私有部署版支持通过 Kafka 订阅数据; 如果在自己的机器上订阅数据,需要先在自己机器上 hosts 中配上神策的服务器,订阅的时候 zookeeper 填写所有机器的 hostname,例如:hostname1:2181,hostname2:2181,hostname3:2181 第三、去神策的kafka下查看一下日志,看有没有外部消费者连接报错信息 第四、有报错信息就解决一下 ------------------ 原始邮件 ------------------ 发件人: "sunfulin"<[hidden email]>; 发送时间: 2020年1月10日(星期五) 上午9:51 收件人: "[hidden email]"<[hidden email]>; 主题: flink消费Kafka没有数据问题 我有一个作业,通过Flink去消费神策的Kafka消息数据,参见(https://manual.sensorsdata.cn/sa/latest/page-1573828.html)。但是任务启动之后,Flink任务没有消费任何Kafka消息,我通过神策的kafka console consumer能看到源源不断的数据。 本身flink作业job里没有任何报错,想问下大家这种情况该如何排查? |
In reply to this post by sunfulin
可以看下source的并行度是否大于kafka分区数量
---原始邮件--- 发件人: "sunfulin"<[hidden email]> 发送时间: 2020年1月10日(周五) 上午9:51 收件人: "[hidden email]"<[hidden email]>; 主题: flink消费Kafka没有数据问题 我有一个作业,通过Flink去消费神策的Kafka消息数据,参见(https://manual.sensorsdata.cn/sa/latest/page-1573828.html)。但是任务启动之后,Flink任务没有消费任何Kafka消息,我通过神策的kafka console consumer能看到源源不断的数据。 本身flink作业job里没有任何报错,想问下大家这种情况该如何排查? |
In reply to this post by Evan
感谢回复,排查后确实是hostname的配置问题。
任务还遇到了另外一个问题。下面是读取的Kafka连接配置,使用JSON SCHEMA来解析。不过实际运行时却抛出了如下异常,请问有大神知道是啥原因么? Caused by: java.lang.NullPointerException: Null result cannot be used for atomic types. at DataStreamSinkConversion$5.map(Unknown Source) at org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:55) at org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:34) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28) at DataStreamSourceConversion$2.processElement(Unknown Source) at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) at org.apache.flink.streaming. tableEnv.connect( new Kafka() .version(kafkaInstance.getVersion()) .topic(chooseKafkaTopic(initPack.clusterMode)) .property("bootstrap.servers", kafkaInstance.getBrokerList()) .property("group.id", initPack.jobName) .startFromEarliest() // 测试用,上生产可以去掉 ).withSchema( new Schema() // 时间戳字段 .field("rowtime", Types.SQL_TIMESTAMP).rowtime( new Rowtime() .timestampsFromField("time") .watermarksPeriodicBounded(1000) ) .field("type", Types.STRING) .field("event", Types.STRING) .field("user_id", Types.STRING) .field("distinct_id", Types.STRING) .field("project", Types.STRING) .field("recv_time", Types.SQL_TIMESTAMP) .field("properties", Types.ROW_NAMED( new String[] { "BROWSER_VERSION", "pathname", "search", "eventType", "message", "stack", "componentStack" }, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING) ) ).withFormat( new Json().failOnMissingField(false) .deriveSchema() ) .inAppendMode() .registerTableSource(getTableName()); 在 2020-01-10 09:53:52,"Evan" <[hidden email]> 写道: >第一,查看神策的kafka的配置项advertised.host.name > > > > >------------------ 原始邮件 ------------------ >发件人: "sunfulin"<[hidden email]>; >发送时间: 2020年1月10日(星期五) 上午9:51 >收件人: "[hidden email]"<[hidden email]>; > >主题: flink消费Kafka没有数据问题 > > > >我有一个作业,通过Flink去消费神策的Kafka消息数据,参见(https://manual.sensorsdata.cn/sa/latest/page-1573828.html)。但是任务启动之后,Flink任务没有消费任何Kafka消息,我通过神策的kafka console consumer能看到源源不断的数据。 >本身flink作业job里没有任何报错,想问下大家这种情况该如何排查? |
Free forum by Nabble | Edit this page |