最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报 `Type is not supported: ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数 STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array Json2Map udf应该怎么操作呢?求前辈指导 udfd代码如下: public class Json2List extends ScalarFunction { private static final Logger LOG = LoggerFactory.getLogger(Json2List.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true) .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, true) ; public Json2List(){} public List<String> eval(String param) { List<String> result = new ArrayList<>(); try { List<Map<Object, Object>> list = OBJECT_MAPPER.readValue(param, List.class); for(Map<Object, Object> map : list){ result.add(OBJECT_MAPPER.writeValueAsString(map)); } return result; } catch (JsonProcessingException e){ LOG.error("failed to convert json to array, param is: {}", param, e); } return result; } @Override public TypeInformation<List<String>> getResultType(Class<?>[] signature) { return Types.LIST(Types.STRING); } } |
目前转List可以用数组代替,Map貌似没法成功运行
zilong xiao <[hidden email]> 于2020年8月3日周一 上午10:43写道: > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array > Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报 `Type is not > supported: > ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数 > STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array Json2Map > udf应该怎么操作呢?求前辈指导 > > udfd代码如下: > > public class Json2List extends ScalarFunction { > > private static final Logger LOG = LoggerFactory.getLogger(Json2List.class); > > private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() > .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true) > .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, true) ; > > public Json2List(){} > > public List<String> eval(String param) { > List<String> result = new ArrayList<>(); > try { > List<Map<Object, Object>> list = OBJECT_MAPPER.readValue(param, List.class); > for(Map<Object, Object> map : list){ > result.add(OBJECT_MAPPER.writeValueAsString(map)); > } > return result; > } catch (JsonProcessingException e){ > LOG.error("failed to convert json to array, param is: {}", param, e); > } > return result; > } > > > @Override > public TypeInformation<List<String>> getResultType(Class<?>[] signature) { > return Types.LIST(Types.STRING); > } > > } > > |
你把Map<Object, Object>换为Map<String, String>试试
zilong xiao <[hidden email]> 于2020年8月3日周一 下午4:56写道: > 目前转List可以用数组代替,Map貌似没法成功运行 > > zilong xiao <[hidden email]> 于2020年8月3日周一 上午10:43写道: > > > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array > > Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报 `Type is > not > > supported: > > > ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数 > > STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array > Json2Map > > udf应该怎么操作呢?求前辈指导 > > > > udfd代码如下: > > > > public class Json2List extends ScalarFunction { > > > > private static final Logger LOG = > LoggerFactory.getLogger(Json2List.class); > > > > private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() > > .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true) > > .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, true) ; > > > > public Json2List(){} > > > > public List<String> eval(String param) { > > List<String> result = new ArrayList<>(); > > try { > > List<Map<Object, Object>> list = OBJECT_MAPPER.readValue(param, > List.class); > > for(Map<Object, Object> map : list){ > > result.add(OBJECT_MAPPER.writeValueAsString(map)); > > } > > return result; > > } catch (JsonProcessingException e){ > > LOG.error("failed to convert json to array, param is: {}", > param, e); > > } > > return result; > > } > > > > > > @Override > > public TypeInformation<List<String>> getResultType(Class<?>[] > signature) { > > return Types.LIST(Types.STRING); > > } > > > > } > > > > > |
不行的,试过了,遍历map把里面的entity全部toString都不行,应该是Java泛型机制的原因,不知道社区大佬怎么看待这个问题
godfrey he <[hidden email]> 于2020年8月3日周一 下午7:50写道: > 你把Map<Object, Object>换为Map<String, String>试试 > > zilong xiao <[hidden email]> 于2020年8月3日周一 下午4:56写道: > > > 目前转List可以用数组代替,Map貌似没法成功运行 > > > > zilong xiao <[hidden email]> 于2020年8月3日周一 上午10:43写道: > > > > > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array > > > Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报 `Type is > > not > > > supported: > > > > > > ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数 > > > STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array > > Json2Map > > > udf应该怎么操作呢?求前辈指导 > > > > > > udfd代码如下: > > > > > > public class Json2List extends ScalarFunction { > > > > > > private static final Logger LOG = > > LoggerFactory.getLogger(Json2List.class); > > > > > > private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() > > > .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true) > > > .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, > true) ; > > > > > > public Json2List(){} > > > > > > public List<String> eval(String param) { > > > List<String> result = new ArrayList<>(); > > > try { > > > List<Map<Object, Object>> list = > OBJECT_MAPPER.readValue(param, > > List.class); > > > for(Map<Object, Object> map : list){ > > > result.add(OBJECT_MAPPER.writeValueAsString(map)); > > > } > > > return result; > > > } catch (JsonProcessingException e){ > > > LOG.error("failed to convert json to array, param is: {}", > > param, e); > > > } > > > return result; > > > } > > > > > > > > > @Override > > > public TypeInformation<List<String>> getResultType(Class<?>[] > > signature) { > > > return Types.LIST(Types.STRING); > > > } > > > > > > } > > > > > > > > > |
Hi zilong,
SQL里面的ARRAY类型,对应的legacy type,应该是Types.PRIMITIVE_ARRAY或者Types.OBJECT_ARRAY, 其他类型的type information会被当做any类型来处理。 这里应该跟泛型没有关系,就是在实现的时候并没有考虑将Types.LIST(Types.STRING)当做SQL里面的ARRAY类型。 支持List作为ARRAY的数据,应该要在1.12才能支持[1]。 [1] https://issues.apache.org/jira/browse/FLINK-18417 zilong xiao <[hidden email]> 于2020年8月3日周一 下午8:23写道: > 不行的,试过了,遍历map把里面的entity全部toString都不行,应该是Java泛型机制的原因,不知道社区大佬怎么看待这个问题 > > godfrey he <[hidden email]> 于2020年8月3日周一 下午7:50写道: > > > 你把Map<Object, Object>换为Map<String, String>试试 > > > > zilong xiao <[hidden email]> 于2020年8月3日周一 下午4:56写道: > > > > > 目前转List可以用数组代替,Map貌似没法成功运行 > > > > > > zilong xiao <[hidden email]> 于2020年8月3日周一 上午10:43写道: > > > > > > > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array > > > > Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报 `Type > is > > > not > > > > supported: > > > > > > > > > > ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数 > > > > STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array > > > Json2Map > > > > udf应该怎么操作呢?求前辈指导 > > > > > > > > udfd代码如下: > > > > > > > > public class Json2List extends ScalarFunction { > > > > > > > > private static final Logger LOG = > > > LoggerFactory.getLogger(Json2List.class); > > > > > > > > private static final ObjectMapper OBJECT_MAPPER = new > ObjectMapper() > > > > .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true) > > > > .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, > > true) ; > > > > > > > > public Json2List(){} > > > > > > > > public List<String> eval(String param) { > > > > List<String> result = new ArrayList<>(); > > > > try { > > > > List<Map<Object, Object>> list = > > OBJECT_MAPPER.readValue(param, > > > List.class); > > > > for(Map<Object, Object> map : list){ > > > > result.add(OBJECT_MAPPER.writeValueAsString(map)); > > > > } > > > > return result; > > > > } catch (JsonProcessingException e){ > > > > LOG.error("failed to convert json to array, param is: {}", > > > param, e); > > > > } > > > > return result; > > > > } > > > > > > > > > > > > @Override > > > > public TypeInformation<List<String>> getResultType(Class<?>[] > > > signature) { > > > > return Types.LIST(Types.STRING); > > > > } > > > > > > > > } > > > > > > > > > > > > > > -- Best, Benchao Li |
感谢benchao大佬回答,对于Java用户而言,Map容器用的也比较多,目前有用户需要从json字符串中取对应key的值,目前的做法是提供了一个getValueFromJson的UDF,每次要取值时都要将json反序列化成Map然后get(key),有什么方法可以使得UDF直接返回Map类型吗?这样就可以避免每次都去反序列化然后取值
Benchao Li <[hidden email]> 于2020年8月5日周三 下午11:49写道: > Hi zilong, > > SQL里面的ARRAY类型,对应的legacy type,应该是Types.PRIMITIVE_ARRAY或者Types.OBJECT_ARRAY, > 其他类型的type information会被当做any类型来处理。 > 这里应该跟泛型没有关系,就是在实现的时候并没有考虑将Types.LIST(Types.STRING)当做SQL里面的ARRAY类型。 > 支持List作为ARRAY的数据,应该要在1.12才能支持[1]。 > > [1] https://issues.apache.org/jira/browse/FLINK-18417 > > zilong xiao <[hidden email]> 于2020年8月3日周一 下午8:23写道: > > > 不行的,试过了,遍历map把里面的entity全部toString都不行,应该是Java泛型机制的原因,不知道社区大佬怎么看待这个问题 > > > > godfrey he <[hidden email]> 于2020年8月3日周一 下午7:50写道: > > > > > 你把Map<Object, Object>换为Map<String, String>试试 > > > > > > zilong xiao <[hidden email]> 于2020年8月3日周一 下午4:56写道: > > > > > > > 目前转List可以用数组代替,Map貌似没法成功运行 > > > > > > > > zilong xiao <[hidden email]> 于2020年8月3日周一 上午10:43写道: > > > > > > > > > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array > > > > > Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报 > `Type > > is > > > > not > > > > > supported: > > > > > > > > > > > > > > > ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数 > > > > > > STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array > > > > Json2Map > > > > > udf应该怎么操作呢?求前辈指导 > > > > > > > > > > udfd代码如下: > > > > > > > > > > public class Json2List extends ScalarFunction { > > > > > > > > > > private static final Logger LOG = > > > > LoggerFactory.getLogger(Json2List.class); > > > > > > > > > > private static final ObjectMapper OBJECT_MAPPER = new > > ObjectMapper() > > > > > .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true) > > > > > .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, > > > true) ; > > > > > > > > > > public Json2List(){} > > > > > > > > > > public List<String> eval(String param) { > > > > > List<String> result = new ArrayList<>(); > > > > > try { > > > > > List<Map<Object, Object>> list = > > > OBJECT_MAPPER.readValue(param, > > > > List.class); > > > > > for(Map<Object, Object> map : list){ > > > > > result.add(OBJECT_MAPPER.writeValueAsString(map)); > > > > > } > > > > > return result; > > > > > } catch (JsonProcessingException e){ > > > > > LOG.error("failed to convert json to array, param is: {}", > > > > param, e); > > > > > } > > > > > return result; > > > > > } > > > > > > > > > > > > > > > @Override > > > > > public TypeInformation<List<String>> getResultType(Class<?>[] > > > > signature) { > > > > > return Types.LIST(Types.STRING); > > > > > } > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > -- > > Best, > Benchao Li > |
可以直接返回Map类型呀,比如:
public class String2Map extends ScalarFunction { public Map<String, String> eval(String param) throws Exception { Map<String, String> map = new HashMap<>(); // ... return map; } @Override public TypeInformation<?> getResultType(Class<?>[] signature) { return Types.MAP(Types.STRING, Types.STRING); } } zilong xiao <[hidden email]> 于2020年8月6日周四 上午10:24写道: > > 感谢benchao大佬回答,对于Java用户而言,Map容器用的也比较多,目前有用户需要从json字符串中取对应key的值,目前的做法是提供了一个getValueFromJson的UDF,每次要取值时都要将json反序列化成Map然后get(key),有什么方法可以使得UDF直接返回Map类型吗?这样就可以避免每次都去反序列化然后取值 > > Benchao Li <[hidden email]> 于2020年8月5日周三 下午11:49写道: > > > Hi zilong, > > > > SQL里面的ARRAY类型,对应的legacy > type,应该是Types.PRIMITIVE_ARRAY或者Types.OBJECT_ARRAY, > > 其他类型的type information会被当做any类型来处理。 > > 这里应该跟泛型没有关系,就是在实现的时候并没有考虑将Types.LIST(Types.STRING)当做SQL里面的ARRAY类型。 > > 支持List作为ARRAY的数据,应该要在1.12才能支持[1]。 > > > > [1] https://issues.apache.org/jira/browse/FLINK-18417 > > > > zilong xiao <[hidden email]> 于2020年8月3日周一 下午8:23写道: > > > > > 不行的,试过了,遍历map把里面的entity全部toString都不行,应该是Java泛型机制的原因,不知道社区大佬怎么看待这个问题 > > > > > > godfrey he <[hidden email]> 于2020年8月3日周一 下午7:50写道: > > > > > > > 你把Map<Object, Object>换为Map<String, String>试试 > > > > > > > > zilong xiao <[hidden email]> 于2020年8月3日周一 下午4:56写道: > > > > > > > > > 目前转List可以用数组代替,Map貌似没法成功运行 > > > > > > > > > > zilong xiao <[hidden email]> 于2020年8月3日周一 上午10:43写道: > > > > > > > > > > > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array > > > > > > Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报 > > `Type > > > is > > > > > not > > > > > > supported: > > > > > > > > > > > > > > > > > > > > > ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数 > > > > > > > > STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array > > > > > Json2Map > > > > > > udf应该怎么操作呢?求前辈指导 > > > > > > > > > > > > udfd代码如下: > > > > > > > > > > > > public class Json2List extends ScalarFunction { > > > > > > > > > > > > private static final Logger LOG = > > > > > LoggerFactory.getLogger(Json2List.class); > > > > > > > > > > > > private static final ObjectMapper OBJECT_MAPPER = new > > > ObjectMapper() > > > > > > .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true) > > > > > > .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, > > > > true) ; > > > > > > > > > > > > public Json2List(){} > > > > > > > > > > > > public List<String> eval(String param) { > > > > > > List<String> result = new ArrayList<>(); > > > > > > try { > > > > > > List<Map<Object, Object>> list = > > > > OBJECT_MAPPER.readValue(param, > > > > > List.class); > > > > > > for(Map<Object, Object> map : list){ > > > > > > result.add(OBJECT_MAPPER.writeValueAsString(map)); > > > > > > } > > > > > > return result; > > > > > > } catch (JsonProcessingException e){ > > > > > > LOG.error("failed to convert json to array, param is: > {}", > > > > > param, e); > > > > > > } > > > > > > return result; > > > > > > } > > > > > > > > > > > > > > > > > > @Override > > > > > > public TypeInformation<List<String>> getResultType(Class<?>[] > > > > > signature) { > > > > > > return Types.LIST(Types.STRING); > > > > > > } > > > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Best, > > Benchao Li > > > -- Best, Benchao Li |
我这么写过,貌似不行,下面是我的代码,可否看下是否可行?
public class Json2Map extends ScalarFunction { private static final Logger LOG = LoggerFactory.getLogger(Json2Map.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public Json2Map(){} public Map<String, String> eval(String param) { Map<String, String> result = new HashMap<>(); try { if (param == null) { return result; } result = OBJECT_MAPPER.readValue(param, Map.class); // 遍历toString 貌似也不行 //for(Object obj : tmp.keySet()){ // result.put(String.valueOf(obj), String.valueOf(tmp.get(obj))); //} LOG.info("result is: {}", result); } catch (JsonProcessingException e){ LOG.error("failed to convert json to map, param is: {}", param, e); } return result; } @Override public TypeInformation<Map<String, String>> getResultType(Class<?>[] signature) { return Types.MAP(Types.STRING, Types.STRING); } } Benchao Li <[hidden email]> 于2020年8月6日周四 上午11:04写道: > 可以直接返回Map类型呀,比如: > > public class String2Map extends ScalarFunction { > > public Map<String, String> eval(String param) throws Exception { > Map<String, String> map = new HashMap<>(); > // ... > return map; > } > > @Override > public TypeInformation<?> getResultType(Class<?>[] signature) { > return Types.MAP(Types.STRING, Types.STRING); > } > > } > > > zilong xiao <[hidden email]> 于2020年8月6日周四 上午10:24写道: > > > > > > 感谢benchao大佬回答,对于Java用户而言,Map容器用的也比较多,目前有用户需要从json字符串中取对应key的值,目前的做法是提供了一个getValueFromJson的UDF,每次要取值时都要将json反序列化成Map然后get(key),有什么方法可以使得UDF直接返回Map类型吗?这样就可以避免每次都去反序列化然后取值 > > > > Benchao Li <[hidden email]> 于2020年8月5日周三 下午11:49写道: > > > > > Hi zilong, > > > > > > SQL里面的ARRAY类型,对应的legacy > > type,应该是Types.PRIMITIVE_ARRAY或者Types.OBJECT_ARRAY, > > > 其他类型的type information会被当做any类型来处理。 > > > 这里应该跟泛型没有关系,就是在实现的时候并没有考虑将Types.LIST(Types.STRING)当做SQL里面的ARRAY类型。 > > > 支持List作为ARRAY的数据,应该要在1.12才能支持[1]。 > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-18417 > > > > > > zilong xiao <[hidden email]> 于2020年8月3日周一 下午8:23写道: > > > > > > > 不行的,试过了,遍历map把里面的entity全部toString都不行,应该是Java泛型机制的原因,不知道社区大佬怎么看待这个问题 > > > > > > > > godfrey he <[hidden email]> 于2020年8月3日周一 下午7:50写道: > > > > > > > > > 你把Map<Object, Object>换为Map<String, String>试试 > > > > > > > > > > zilong xiao <[hidden email]> 于2020年8月3日周一 下午4:56写道: > > > > > > > > > > > 目前转List可以用数组代替,Map貌似没法成功运行 > > > > > > > > > > > > zilong xiao <[hidden email]> 于2020年8月3日周一 上午10:43写道: > > > > > > > > > > > > > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array > > > > > > > Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报 > > > `Type > > > > is > > > > > > not > > > > > > > supported: > > > > > > > > > > > > > > > > > > > > > > > > > > > > ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数 > > > > > > > > > > STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array > > > > > > Json2Map > > > > > > > udf应该怎么操作呢?求前辈指导 > > > > > > > > > > > > > > udfd代码如下: > > > > > > > > > > > > > > public class Json2List extends ScalarFunction { > > > > > > > > > > > > > > private static final Logger LOG = > > > > > > LoggerFactory.getLogger(Json2List.class); > > > > > > > > > > > > > > private static final ObjectMapper OBJECT_MAPPER = new > > > > ObjectMapper() > > > > > > > .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true) > > > > > > > > .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, > > > > > true) ; > > > > > > > > > > > > > > public Json2List(){} > > > > > > > > > > > > > > public List<String> eval(String param) { > > > > > > > List<String> result = new ArrayList<>(); > > > > > > > try { > > > > > > > List<Map<Object, Object>> list = > > > > > OBJECT_MAPPER.readValue(param, > > > > > > List.class); > > > > > > > for(Map<Object, Object> map : list){ > > > > > > > result.add(OBJECT_MAPPER.writeValueAsString(map)); > > > > > > > } > > > > > > > return result; > > > > > > > } catch (JsonProcessingException e){ > > > > > > > LOG.error("failed to convert json to array, param is: > > {}", > > > > > > param, e); > > > > > > > } > > > > > > > return result; > > > > > > > } > > > > > > > > > > > > > > > > > > > > > @Override > > > > > > > public TypeInformation<List<String>> > getResultType(Class<?>[] > > > > > > signature) { > > > > > > > return Types.LIST(Types.STRING); > > > > > > > } > > > > > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > Best, > > > Benchao Li > > > > > > > > -- > > Best, > Benchao Li > |
看起来写法没啥问题,我们就是这么用的。
你用的是哪个版本的Flink?然后是怎么注册的UDF呢? zilong xiao <[hidden email]> 于2020年8月6日周四 下午12:06写道: > 我这么写过,貌似不行,下面是我的代码,可否看下是否可行? > > public class Json2Map extends ScalarFunction { > > private static final Logger LOG = > LoggerFactory.getLogger(Json2Map.class); > > private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); > > public Json2Map(){} > > public Map<String, String> eval(String param) { > Map<String, String> result = new HashMap<>(); > try { > if (param == null) { > return result; > } > result = OBJECT_MAPPER.readValue(param, Map.class); > // 遍历toString 貌似也不行 > //for(Object obj : tmp.keySet()){ > // result.put(String.valueOf(obj), String.valueOf(tmp.get(obj))); > //} > LOG.info("result is: {}", result); > } catch (JsonProcessingException e){ > LOG.error("failed to convert json to map, param is: {}", param, > e); > } > return result; > } > > > @Override > public TypeInformation<Map<String, String>> > getResultType(Class<?>[] signature) { > return Types.MAP(Types.STRING, Types.STRING); > } > > } > > > Benchao Li <[hidden email]> 于2020年8月6日周四 上午11:04写道: > > > 可以直接返回Map类型呀,比如: > > > > public class String2Map extends ScalarFunction { > > > > public Map<String, String> eval(String param) throws Exception { > > Map<String, String> map = new HashMap<>(); > > // ... > > return map; > > } > > > > @Override > > public TypeInformation<?> getResultType(Class<?>[] signature) { > > return Types.MAP(Types.STRING, Types.STRING); > > } > > > > } > > > > > > zilong xiao <[hidden email]> 于2020年8月6日周四 上午10:24写道: > > > > > > > > > > > 感谢benchao大佬回答,对于Java用户而言,Map容器用的也比较多,目前有用户需要从json字符串中取对应key的值,目前的做法是提供了一个getValueFromJson的UDF,每次要取值时都要将json反序列化成Map然后get(key),有什么方法可以使得UDF直接返回Map类型吗?这样就可以避免每次都去反序列化然后取值 > > > > > > Benchao Li <[hidden email]> 于2020年8月5日周三 下午11:49写道: > > > > > > > Hi zilong, > > > > > > > > SQL里面的ARRAY类型,对应的legacy > > > type,应该是Types.PRIMITIVE_ARRAY或者Types.OBJECT_ARRAY, > > > > 其他类型的type information会被当做any类型来处理。 > > > > 这里应该跟泛型没有关系,就是在实现的时候并没有考虑将Types.LIST(Types.STRING)当做SQL里面的ARRAY类型。 > > > > 支持List作为ARRAY的数据,应该要在1.12才能支持[1]。 > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-18417 > > > > > > > > zilong xiao <[hidden email]> 于2020年8月3日周一 下午8:23写道: > > > > > > > > > 不行的,试过了,遍历map把里面的entity全部toString都不行,应该是Java泛型机制的原因,不知道社区大佬怎么看待这个问题 > > > > > > > > > > godfrey he <[hidden email]> 于2020年8月3日周一 下午7:50写道: > > > > > > > > > > > 你把Map<Object, Object>换为Map<String, String>试试 > > > > > > > > > > > > zilong xiao <[hidden email]> 于2020年8月3日周一 下午4:56写道: > > > > > > > > > > > > > 目前转List可以用数组代替,Map貌似没法成功运行 > > > > > > > > > > > > > > zilong xiao <[hidden email]> 于2020年8月3日周一 上午10:43写道: > > > > > > > > > > > > > > > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array > > > > > > > > > Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报 > > > > `Type > > > > > is > > > > > > > not > > > > > > > > supported: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数 > > > > > > > > > > > > STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array > > > > > > > Json2Map > > > > > > > > udf应该怎么操作呢?求前辈指导 > > > > > > > > > > > > > > > > udfd代码如下: > > > > > > > > > > > > > > > > public class Json2List extends ScalarFunction { > > > > > > > > > > > > > > > > private static final Logger LOG = > > > > > > > LoggerFactory.getLogger(Json2List.class); > > > > > > > > > > > > > > > > private static final ObjectMapper OBJECT_MAPPER = new > > > > > ObjectMapper() > > > > > > > > .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, > true) > > > > > > > > > > .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, > > > > > > true) ; > > > > > > > > > > > > > > > > public Json2List(){} > > > > > > > > > > > > > > > > public List<String> eval(String param) { > > > > > > > > List<String> result = new ArrayList<>(); > > > > > > > > try { > > > > > > > > List<Map<Object, Object>> list = > > > > > > OBJECT_MAPPER.readValue(param, > > > > > > > List.class); > > > > > > > > for(Map<Object, Object> map : list){ > > > > > > > > > result.add(OBJECT_MAPPER.writeValueAsString(map)); > > > > > > > > } > > > > > > > > return result; > > > > > > > > } catch (JsonProcessingException e){ > > > > > > > > LOG.error("failed to convert json to array, param > is: > > > {}", > > > > > > > param, e); > > > > > > > > } > > > > > > > > return result; > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > > > @Override > > > > > > > > public TypeInformation<List<String>> > > getResultType(Class<?>[] > > > > > > > signature) { > > > > > > > > return Types.LIST(Types.STRING); > > > > > > > > } > > > > > > > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > Best, > > > > Benchao Li > > > > > > > > > > > > > -- > > > > Best, > > Benchao Li > > > -- Best, Benchao Li |
Free forum by Nabble | Edit this page |