flink sql 数据异常导致任务失败

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

flink sql 数据异常导致任务失败

nobleyd
kafka source,一条异常数据就会导致任务失败,这种问题怎么解决呢?

以前用DatastreamAPI的话是自己解析,因此可以捕获异常,仅统计非法数据量作为flink的metric指标。

现在是基于flink sql直接基于kakfa创建动态表,查询动态表过程json解析失败,导致任务失败。
Reply | Threaded
Open this post in threaded view
|

Re: flink sql 数据异常导致任务失败

nobleyd
有没有小伙伴生产中有类似问题呢,都怎么解决的呢?
我指通过FlinkSQL方式,结合datastream api方式倒是好解决,只需要把容易出错的地方换成datastream
api,然后捕获所有异常即可。

赵一旦 <[hidden email]> 于2020年8月17日周一 下午7:15写道:

> kafka source,一条异常数据就会导致任务失败,这种问题怎么解决呢?
>
> 以前用DatastreamAPI的话是自己解析,因此可以捕获异常,仅统计非法数据量作为flink的metric指标。
>
> 现在是基于flink sql直接基于kakfa创建动态表,查询动态表过程json解析失败,导致任务失败。
>
Reply | Threaded
Open this post in threaded view
|

Re: flink sql 数据异常导致任务失败

shizk233
考虑修改一下json解析的逻辑来处理异常数据?

赵一旦 <[hidden email]> 于2020年8月18日周二 上午11:59写道:

> 有没有小伙伴生产中有类似问题呢,都怎么解决的呢?
> 我指通过FlinkSQL方式,结合datastream api方式倒是好解决,只需要把容易出错的地方换成datastream
> api,然后捕获所有异常即可。
>
> 赵一旦 <[hidden email]> 于2020年8月17日周一 下午7:15写道:
>
> > kafka source,一条异常数据就会导致任务失败,这种问题怎么解决呢?
> >
> > 以前用DatastreamAPI的话是自己解析,因此可以捕获异常,仅统计非法数据量作为flink的metric指标。
> >
> > 现在是基于flink sql直接基于kakfa创建动态表,查询动态表过程json解析失败,导致任务失败。
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: flink sql 数据异常导致任务失败

nobleyd
我刚刚接触flinksql,主要是感觉这个问题很明显,大家如果生产中使用的话,应该都已经有方案才对,但是好像没啥人有回应。

shizk233 <[hidden email]> 于2020年8月18日周二 下午2:26写道:

> 考虑修改一下json解析的逻辑来处理异常数据?
>
> 赵一旦 <[hidden email]> 于2020年8月18日周二 上午11:59写道:
>
> > 有没有小伙伴生产中有类似问题呢,都怎么解决的呢?
> > 我指通过FlinkSQL方式,结合datastream api方式倒是好解决,只需要把容易出错的地方换成datastream
> > api,然后捕获所有异常即可。
> >
> > 赵一旦 <[hidden email]> 于2020年8月17日周一 下午7:15写道:
> >
> > > kafka source,一条异常数据就会导致任务失败,这种问题怎么解决呢?
> > >
> > > 以前用DatastreamAPI的话是自己解析,因此可以捕获异常,仅统计非法数据量作为flink的metric指标。
> > >
> > > 现在是基于flink sql直接基于kakfa创建动态表,查询动态表过程json解析失败,导致任务失败。
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: flink sql 数据异常导致任务失败

Jingsong Li
Hi,

最新的版本(1.11+)已经有这个属性可以配置了:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/json.html#format-options

Best,
Jingsong

On Tue, Aug 18, 2020 at 2:42 PM 赵一旦 <[hidden email]> wrote:

> 我刚刚接触flinksql,主要是感觉这个问题很明显,大家如果生产中使用的话,应该都已经有方案才对,但是好像没啥人有回应。
>
> shizk233 <[hidden email]> 于2020年8月18日周二 下午2:26写道:
>
> > 考虑修改一下json解析的逻辑来处理异常数据?
> >
> > 赵一旦 <[hidden email]> 于2020年8月18日周二 上午11:59写道:
> >
> > > 有没有小伙伴生产中有类似问题呢,都怎么解决的呢?
> > > 我指通过FlinkSQL方式,结合datastream api方式倒是好解决,只需要把容易出错的地方换成datastream
> > > api,然后捕获所有异常即可。
> > >
> > > 赵一旦 <[hidden email]> 于2020年8月17日周一 下午7:15写道:
> > >
> > > > kafka source,一条异常数据就会导致任务失败,这种问题怎么解决呢?
> > > >
> > > > 以前用DatastreamAPI的话是自己解析,因此可以捕获异常,仅统计非法数据量作为flink的metric指标。
> > > >
> > > > 现在是基于flink sql直接基于kakfa创建动态表,查询动态表过程json解析失败,导致任务失败。
> > > >
> > >
> >
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: flink sql 数据异常导致任务失败

china_tao
In reply to this post by nobleyd
你kafka里面的是json么?format是json么?

String resultCreateTableSql = createKafkaSourceSQL +" WITH ( " +" 'connector' = 'kafka' ," +" 'topic' = '" + kafkaTopic +"'," +" 'properties.bootstrap.servers' = '" + kafkaBootstrapServers +"'," +" 'properties.group.id' = '" + kafkaGroupId +"'," +" 'format' = '" + kafkaFormat +"'," +" 'scan.startup.mode' = '" + scanStartUpMode +"'," +" 'json.fail-on-missing-field' = 'false'," +" 'json.ignore-parse-errors' = 'true' )";

json.fail-on-missing-field
  json.ignore-parse-errors
  这两个参数你加了么?加了没用?

在 2020/8/18 14:34, 赵一旦 写道:

> 我刚刚接触flinksql,主要是感觉这个问题很明显,大家如果生产中使用的话,应该都已经有方案才对,但是好像没啥人有回应。
>
> shizk233 <[hidden email]> 于2020年8月18日周二 下午2:26写道:
>
>> 考虑修改一下json解析的逻辑来处理异常数据?
>>
>> 赵一旦 <[hidden email]> 于2020年8月18日周二 上午11:59写道:
>>
>>> 有没有小伙伴生产中有类似问题呢,都怎么解决的呢?
>>> 我指通过FlinkSQL方式,结合datastream api方式倒是好解决,只需要把容易出错的地方换成datastream
>>> api,然后捕获所有异常即可。
>>>
>>> 赵一旦 <[hidden email]> 于2020年8月17日周一 下午7:15写道:
>>>
>>>> kafka source,一条异常数据就会导致任务失败,这种问题怎么解决呢?
>>>>
>>>> 以前用DatastreamAPI的话是自己解析,因此可以捕获异常,仅统计非法数据量作为flink的metric指标。
>>>>
>>>> 现在是基于flink sql直接基于kakfa创建动态表,查询动态表过程json解析失败,导致任务失败。
>>>>
Reply | Threaded
Open this post in threaded view
|

Re: flink sql 数据异常导致任务失败

china_tao
In reply to this post by Jingsong Li
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html

官网里面写的很 清楚啊

在 2020/8/18 14:45, Jingsong Li 写道:

> Hi,
>
> 最新的版本(1.11+)已经有这个属性可以配置了:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/json.html#format-options
>
> Best,
> Jingsong
>
> On Tue, Aug 18, 2020 at 2:42 PM 赵一旦 <[hidden email]> wrote:
>
>> 我刚刚接触flinksql,主要是感觉这个问题很明显,大家如果生产中使用的话,应该都已经有方案才对,但是好像没啥人有回应。
>>
>> shizk233 <[hidden email]> 于2020年8月18日周二 下午2:26写道:
>>
>>> 考虑修改一下json解析的逻辑来处理异常数据?
>>>
>>> 赵一旦 <[hidden email]> 于2020年8月18日周二 上午11:59写道:
>>>
>>>> 有没有小伙伴生产中有类似问题呢,都怎么解决的呢?
>>>> 我指通过FlinkSQL方式,结合datastream api方式倒是好解决,只需要把容易出错的地方换成datastream
>>>> api,然后捕获所有异常即可。
>>>>
>>>> 赵一旦 <[hidden email]> 于2020年8月17日周一 下午7:15写道:
>>>>
>>>>> kafka source,一条异常数据就会导致任务失败,这种问题怎么解决呢?
>>>>>
>>>>> 以前用DatastreamAPI的话是自己解析,因此可以捕获异常,仅统计非法数据量作为flink的metric指标。
>>>>>
>>>>> 现在是基于flink sql直接基于kakfa创建动态表,查询动态表过程json解析失败,导致任务失败。
>>>>>
>

Reply | Threaded
Open this post in threaded view
|

Re: flink sql 数据异常导致任务失败

nobleyd
好吧。我是1.10.


taochanglian <[hidden email]> 于2020年8月18日周二 下午3:03写道:

>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html
>
> 官网里面写的很 清楚啊
>
> 在 2020/8/18 14:45, Jingsong Li 写道:
> > Hi,
> >
> > 最新的版本(1.11+)已经有这个属性可以配置了:
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/json.html#format-options
> >
> > Best,
> > Jingsong
> >
> > On Tue, Aug 18, 2020 at 2:42 PM 赵一旦 <[hidden email]> wrote:
> >
> >> 我刚刚接触flinksql,主要是感觉这个问题很明显,大家如果生产中使用的话,应该都已经有方案才对,但是好像没啥人有回应。
> >>
> >> shizk233 <[hidden email]> 于2020年8月18日周二 下午2:26写道:
> >>
> >>> 考虑修改一下json解析的逻辑来处理异常数据?
> >>>
> >>> 赵一旦 <[hidden email]> 于2020年8月18日周二 上午11:59写道:
> >>>
> >>>> 有没有小伙伴生产中有类似问题呢,都怎么解决的呢?
> >>>> 我指通过FlinkSQL方式,结合datastream api方式倒是好解决,只需要把容易出错的地方换成datastream
> >>>> api,然后捕获所有异常即可。
> >>>>
> >>>> 赵一旦 <[hidden email]> 于2020年8月17日周一 下午7:15写道:
> >>>>
> >>>>> kafka source,一条异常数据就会导致任务失败,这种问题怎么解决呢?
> >>>>>
> >>>>> 以前用DatastreamAPI的话是自己解析,因此可以捕获异常,仅统计非法数据量作为flink的metric指标。
> >>>>>
> >>>>> 现在是基于flink sql直接基于kakfa创建动态表,查询动态表过程json解析失败,导致任务失败。
> >>>>>
> >
>
>