UDF:Type is not supported: ANY

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

UDF:Type is not supported: ANY

zilong xiao
最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报 `Type is not
supported:
ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array Json2Map
udf应该怎么操作呢?求前辈指导

udfd代码如下:

public class Json2List extends ScalarFunction {

   private static final Logger LOG = LoggerFactory.getLogger(Json2List.class);

   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
      .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
      .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, true) ;

   public Json2List(){}

   public List<String> eval(String param) {
      List<String> result = new ArrayList<>();
      try {
         List<Map<Object, Object>> list =
OBJECT_MAPPER.readValue(param, List.class);
         for(Map<Object, Object> map : list){
            result.add(OBJECT_MAPPER.writeValueAsString(map));
         }
         return result;
      } catch (JsonProcessingException e){
         LOG.error("failed to convert json to array, param is: {}", param, e);
      }
      return result;
   }


   @Override
   public TypeInformation<List<String>> getResultType(Class<?>[] signature) {
      return Types.LIST(Types.STRING);
   }

}
Reply | Threaded
Open this post in threaded view
|

Re: UDF:Type is not supported: ANY

zilong xiao
目前转List可以用数组代替,Map貌似没法成功运行

zilong xiao <[hidden email]> 于2020年8月3日周一 上午10:43写道:

> 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
> Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报 `Type is not
> supported:
> ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
> STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array Json2Map
> udf应该怎么操作呢?求前辈指导
>
> udfd代码如下:
>
> public class Json2List extends ScalarFunction {
>
>    private static final Logger LOG = LoggerFactory.getLogger(Json2List.class);
>
>    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
>       .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
>       .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, true) ;
>
>    public Json2List(){}
>
>    public List<String> eval(String param) {
>       List<String> result = new ArrayList<>();
>       try {
>          List<Map<Object, Object>> list = OBJECT_MAPPER.readValue(param, List.class);
>          for(Map<Object, Object> map : list){
>             result.add(OBJECT_MAPPER.writeValueAsString(map));
>          }
>          return result;
>       } catch (JsonProcessingException e){
>          LOG.error("failed to convert json to array, param is: {}", param, e);
>       }
>       return result;
>    }
>
>
>    @Override
>    public TypeInformation<List<String>> getResultType(Class<?>[] signature) {
>       return Types.LIST(Types.STRING);
>    }
>
> }
>
>
Reply | Threaded
Open this post in threaded view
|

Re: UDF:Type is not supported: ANY

godfrey he
你把Map<Object, Object>换为Map<String, String>试试

zilong xiao <[hidden email]> 于2020年8月3日周一 下午4:56写道:

> 目前转List可以用数组代替,Map貌似没法成功运行
>
> zilong xiao <[hidden email]> 于2020年8月3日周一 上午10:43写道:
>
> > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
> > Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报 `Type is
> not
> > supported:
> >
> ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
> > STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array
> Json2Map
> > udf应该怎么操作呢?求前辈指导
> >
> > udfd代码如下:
> >
> > public class Json2List extends ScalarFunction {
> >
> >    private static final Logger LOG =
> LoggerFactory.getLogger(Json2List.class);
> >
> >    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
> >       .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
> >       .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, true) ;
> >
> >    public Json2List(){}
> >
> >    public List<String> eval(String param) {
> >       List<String> result = new ArrayList<>();
> >       try {
> >          List<Map<Object, Object>> list = OBJECT_MAPPER.readValue(param,
> List.class);
> >          for(Map<Object, Object> map : list){
> >             result.add(OBJECT_MAPPER.writeValueAsString(map));
> >          }
> >          return result;
> >       } catch (JsonProcessingException e){
> >          LOG.error("failed to convert json to array, param is: {}",
> param, e);
> >       }
> >       return result;
> >    }
> >
> >
> >    @Override
> >    public TypeInformation<List<String>> getResultType(Class<?>[]
> signature) {
> >       return Types.LIST(Types.STRING);
> >    }
> >
> > }
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: UDF:Type is not supported: ANY

zilong xiao
不行的,试过了,遍历map把里面的entity全部toString都不行,应该是Java泛型机制的原因,不知道社区大佬怎么看待这个问题

godfrey he <[hidden email]> 于2020年8月3日周一 下午7:50写道:

> 你把Map<Object, Object>换为Map<String, String>试试
>
> zilong xiao <[hidden email]> 于2020年8月3日周一 下午4:56写道:
>
> > 目前转List可以用数组代替,Map貌似没法成功运行
> >
> > zilong xiao <[hidden email]> 于2020年8月3日周一 上午10:43写道:
> >
> > > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
> > > Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报 `Type is
> > not
> > > supported:
> > >
> >
> ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
> > > STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array
> > Json2Map
> > > udf应该怎么操作呢?求前辈指导
> > >
> > > udfd代码如下:
> > >
> > > public class Json2List extends ScalarFunction {
> > >
> > >    private static final Logger LOG =
> > LoggerFactory.getLogger(Json2List.class);
> > >
> > >    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
> > >       .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
> > >       .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS,
> true) ;
> > >
> > >    public Json2List(){}
> > >
> > >    public List<String> eval(String param) {
> > >       List<String> result = new ArrayList<>();
> > >       try {
> > >          List<Map<Object, Object>> list =
> OBJECT_MAPPER.readValue(param,
> > List.class);
> > >          for(Map<Object, Object> map : list){
> > >             result.add(OBJECT_MAPPER.writeValueAsString(map));
> > >          }
> > >          return result;
> > >       } catch (JsonProcessingException e){
> > >          LOG.error("failed to convert json to array, param is: {}",
> > param, e);
> > >       }
> > >       return result;
> > >    }
> > >
> > >
> > >    @Override
> > >    public TypeInformation<List<String>> getResultType(Class<?>[]
> > signature) {
> > >       return Types.LIST(Types.STRING);
> > >    }
> > >
> > > }
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: UDF:Type is not supported: ANY

Benchao Li-2
Hi zilong,

SQL里面的ARRAY类型,对应的legacy type,应该是Types.PRIMITIVE_ARRAY或者Types.OBJECT_ARRAY,
其他类型的type information会被当做any类型来处理。
这里应该跟泛型没有关系,就是在实现的时候并没有考虑将Types.LIST(Types.STRING)当做SQL里面的ARRAY类型。
支持List作为ARRAY的数据,应该要在1.12才能支持[1]。

[1] https://issues.apache.org/jira/browse/FLINK-18417

zilong xiao <[hidden email]> 于2020年8月3日周一 下午8:23写道:

> 不行的,试过了,遍历map把里面的entity全部toString都不行,应该是Java泛型机制的原因,不知道社区大佬怎么看待这个问题
>
> godfrey he <[hidden email]> 于2020年8月3日周一 下午7:50写道:
>
> > 你把Map<Object, Object>换为Map<String, String>试试
> >
> > zilong xiao <[hidden email]> 于2020年8月3日周一 下午4:56写道:
> >
> > > 目前转List可以用数组代替,Map貌似没法成功运行
> > >
> > > zilong xiao <[hidden email]> 于2020年8月3日周一 上午10:43写道:
> > >
> > > > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
> > > > Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报 `Type
> is
> > > not
> > > > supported:
> > > >
> > >
> >
> ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
> > > > STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array
> > > Json2Map
> > > > udf应该怎么操作呢?求前辈指导
> > > >
> > > > udfd代码如下:
> > > >
> > > > public class Json2List extends ScalarFunction {
> > > >
> > > >    private static final Logger LOG =
> > > LoggerFactory.getLogger(Json2List.class);
> > > >
> > > >    private static final ObjectMapper OBJECT_MAPPER = new
> ObjectMapper()
> > > >       .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
> > > >       .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS,
> > true) ;
> > > >
> > > >    public Json2List(){}
> > > >
> > > >    public List<String> eval(String param) {
> > > >       List<String> result = new ArrayList<>();
> > > >       try {
> > > >          List<Map<Object, Object>> list =
> > OBJECT_MAPPER.readValue(param,
> > > List.class);
> > > >          for(Map<Object, Object> map : list){
> > > >             result.add(OBJECT_MAPPER.writeValueAsString(map));
> > > >          }
> > > >          return result;
> > > >       } catch (JsonProcessingException e){
> > > >          LOG.error("failed to convert json to array, param is: {}",
> > > param, e);
> > > >       }
> > > >       return result;
> > > >    }
> > > >
> > > >
> > > >    @Override
> > > >    public TypeInformation<List<String>> getResultType(Class<?>[]
> > > signature) {
> > > >       return Types.LIST(Types.STRING);
> > > >    }
> > > >
> > > > }
> > > >
> > > >
> > >
> >
>


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: UDF:Type is not supported: ANY

zilong xiao
感谢benchao大佬回答,对于Java用户而言,Map容器用的也比较多,目前有用户需要从json字符串中取对应key的值,目前的做法是提供了一个getValueFromJson的UDF,每次要取值时都要将json反序列化成Map然后get(key),有什么方法可以使得UDF直接返回Map类型吗?这样就可以避免每次都去反序列化然后取值

Benchao Li <[hidden email]> 于2020年8月5日周三 下午11:49写道:

> Hi zilong,
>
> SQL里面的ARRAY类型,对应的legacy type,应该是Types.PRIMITIVE_ARRAY或者Types.OBJECT_ARRAY,
> 其他类型的type information会被当做any类型来处理。
> 这里应该跟泛型没有关系,就是在实现的时候并没有考虑将Types.LIST(Types.STRING)当做SQL里面的ARRAY类型。
> 支持List作为ARRAY的数据,应该要在1.12才能支持[1]。
>
> [1] https://issues.apache.org/jira/browse/FLINK-18417
>
> zilong xiao <[hidden email]> 于2020年8月3日周一 下午8:23写道:
>
> > 不行的,试过了,遍历map把里面的entity全部toString都不行,应该是Java泛型机制的原因,不知道社区大佬怎么看待这个问题
> >
> > godfrey he <[hidden email]> 于2020年8月3日周一 下午7:50写道:
> >
> > > 你把Map<Object, Object>换为Map<String, String>试试
> > >
> > > zilong xiao <[hidden email]> 于2020年8月3日周一 下午4:56写道:
> > >
> > > > 目前转List可以用数组代替,Map貌似没法成功运行
> > > >
> > > > zilong xiao <[hidden email]> 于2020年8月3日周一 上午10:43写道:
> > > >
> > > > > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
> > > > > Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报
> `Type
> > is
> > > > not
> > > > > supported:
> > > > >
> > > >
> > >
> >
> ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
> > > > >
> STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array
> > > > Json2Map
> > > > > udf应该怎么操作呢?求前辈指导
> > > > >
> > > > > udfd代码如下:
> > > > >
> > > > > public class Json2List extends ScalarFunction {
> > > > >
> > > > >    private static final Logger LOG =
> > > > LoggerFactory.getLogger(Json2List.class);
> > > > >
> > > > >    private static final ObjectMapper OBJECT_MAPPER = new
> > ObjectMapper()
> > > > >       .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
> > > > >       .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS,
> > > true) ;
> > > > >
> > > > >    public Json2List(){}
> > > > >
> > > > >    public List<String> eval(String param) {
> > > > >       List<String> result = new ArrayList<>();
> > > > >       try {
> > > > >          List<Map<Object, Object>> list =
> > > OBJECT_MAPPER.readValue(param,
> > > > List.class);
> > > > >          for(Map<Object, Object> map : list){
> > > > >             result.add(OBJECT_MAPPER.writeValueAsString(map));
> > > > >          }
> > > > >          return result;
> > > > >       } catch (JsonProcessingException e){
> > > > >          LOG.error("failed to convert json to array, param is: {}",
> > > > param, e);
> > > > >       }
> > > > >       return result;
> > > > >    }
> > > > >
> > > > >
> > > > >    @Override
> > > > >    public TypeInformation<List<String>> getResultType(Class<?>[]
> > > > signature) {
> > > > >       return Types.LIST(Types.STRING);
> > > > >    }
> > > > >
> > > > > }
> > > > >
> > > > >
> > > >
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>
Reply | Threaded
Open this post in threaded view
|

Re: UDF:Type is not supported: ANY

Benchao Li-2
可以直接返回Map类型呀,比如:

public class String2Map extends ScalarFunction {

   public Map<String, String> eval(String param) throws Exception {
      Map<String, String> map = new HashMap<>();
      // ...
      return map;
   }

   @Override
   public TypeInformation<?> getResultType(Class<?>[] signature) {
      return Types.MAP(Types.STRING, Types.STRING);
   }

}


zilong xiao <[hidden email]> 于2020年8月6日周四 上午10:24写道:

>
> 感谢benchao大佬回答,对于Java用户而言,Map容器用的也比较多,目前有用户需要从json字符串中取对应key的值,目前的做法是提供了一个getValueFromJson的UDF,每次要取值时都要将json反序列化成Map然后get(key),有什么方法可以使得UDF直接返回Map类型吗?这样就可以避免每次都去反序列化然后取值
>
> Benchao Li <[hidden email]> 于2020年8月5日周三 下午11:49写道:
>
> > Hi zilong,
> >
> > SQL里面的ARRAY类型,对应的legacy
> type,应该是Types.PRIMITIVE_ARRAY或者Types.OBJECT_ARRAY,
> > 其他类型的type information会被当做any类型来处理。
> > 这里应该跟泛型没有关系,就是在实现的时候并没有考虑将Types.LIST(Types.STRING)当做SQL里面的ARRAY类型。
> > 支持List作为ARRAY的数据,应该要在1.12才能支持[1]。
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-18417
> >
> > zilong xiao <[hidden email]> 于2020年8月3日周一 下午8:23写道:
> >
> > > 不行的,试过了,遍历map把里面的entity全部toString都不行,应该是Java泛型机制的原因,不知道社区大佬怎么看待这个问题
> > >
> > > godfrey he <[hidden email]> 于2020年8月3日周一 下午7:50写道:
> > >
> > > > 你把Map<Object, Object>换为Map<String, String>试试
> > > >
> > > > zilong xiao <[hidden email]> 于2020年8月3日周一 下午4:56写道:
> > > >
> > > > > 目前转List可以用数组代替,Map貌似没法成功运行
> > > > >
> > > > > zilong xiao <[hidden email]> 于2020年8月3日周一 上午10:43写道:
> > > > >
> > > > > > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
> > > > > > Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报
> > `Type
> > > is
> > > > > not
> > > > > > supported:
> > > > > >
> > > > >
> > > >
> > >
> >
> ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
> > > > > >
> > STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array
> > > > > Json2Map
> > > > > > udf应该怎么操作呢?求前辈指导
> > > > > >
> > > > > > udfd代码如下:
> > > > > >
> > > > > > public class Json2List extends ScalarFunction {
> > > > > >
> > > > > >    private static final Logger LOG =
> > > > > LoggerFactory.getLogger(Json2List.class);
> > > > > >
> > > > > >    private static final ObjectMapper OBJECT_MAPPER = new
> > > ObjectMapper()
> > > > > >       .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
> > > > > >       .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS,
> > > > true) ;
> > > > > >
> > > > > >    public Json2List(){}
> > > > > >
> > > > > >    public List<String> eval(String param) {
> > > > > >       List<String> result = new ArrayList<>();
> > > > > >       try {
> > > > > >          List<Map<Object, Object>> list =
> > > > OBJECT_MAPPER.readValue(param,
> > > > > List.class);
> > > > > >          for(Map<Object, Object> map : list){
> > > > > >             result.add(OBJECT_MAPPER.writeValueAsString(map));
> > > > > >          }
> > > > > >          return result;
> > > > > >       } catch (JsonProcessingException e){
> > > > > >          LOG.error("failed to convert json to array, param is:
> {}",
> > > > > param, e);
> > > > > >       }
> > > > > >       return result;
> > > > > >    }
> > > > > >
> > > > > >
> > > > > >    @Override
> > > > > >    public TypeInformation<List<String>> getResultType(Class<?>[]
> > > > > signature) {
> > > > > >       return Types.LIST(Types.STRING);
> > > > > >    }
> > > > > >
> > > > > > }
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: UDF:Type is not supported: ANY

zilong xiao
我这么写过,貌似不行,下面是我的代码,可否看下是否可行?

public class Json2Map extends ScalarFunction {

   private static final Logger LOG = LoggerFactory.getLogger(Json2Map.class);

   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

   public Json2Map(){}

   public Map<String, String> eval(String param) {
      Map<String, String> result = new HashMap<>();
      try {
         if (param == null) {
            return result;
         }
         result = OBJECT_MAPPER.readValue(param, Map.class);
         // 遍历toString 貌似也不行
         //for(Object obj : tmp.keySet()){
         // result.put(String.valueOf(obj), String.valueOf(tmp.get(obj)));
         //}
         LOG.info("result is: {}", result);
      } catch (JsonProcessingException e){
         LOG.error("failed to convert json to map, param is: {}", param, e);
      }
      return result;
   }


   @Override
   public TypeInformation<Map<String, String>>
getResultType(Class<?>[] signature) {
      return Types.MAP(Types.STRING, Types.STRING);
   }

}


Benchao Li <[hidden email]> 于2020年8月6日周四 上午11:04写道:

> 可以直接返回Map类型呀,比如:
>
> public class String2Map extends ScalarFunction {
>
>    public Map<String, String> eval(String param) throws Exception {
>       Map<String, String> map = new HashMap<>();
>       // ...
>       return map;
>    }
>
>    @Override
>    public TypeInformation<?> getResultType(Class<?>[] signature) {
>       return Types.MAP(Types.STRING, Types.STRING);
>    }
>
> }
>
>
> zilong xiao <[hidden email]> 于2020年8月6日周四 上午10:24写道:
>
> >
> >
> 感谢benchao大佬回答,对于Java用户而言,Map容器用的也比较多,目前有用户需要从json字符串中取对应key的值,目前的做法是提供了一个getValueFromJson的UDF,每次要取值时都要将json反序列化成Map然后get(key),有什么方法可以使得UDF直接返回Map类型吗?这样就可以避免每次都去反序列化然后取值
> >
> > Benchao Li <[hidden email]> 于2020年8月5日周三 下午11:49写道:
> >
> > > Hi zilong,
> > >
> > > SQL里面的ARRAY类型,对应的legacy
> > type,应该是Types.PRIMITIVE_ARRAY或者Types.OBJECT_ARRAY,
> > > 其他类型的type information会被当做any类型来处理。
> > > 这里应该跟泛型没有关系,就是在实现的时候并没有考虑将Types.LIST(Types.STRING)当做SQL里面的ARRAY类型。
> > > 支持List作为ARRAY的数据,应该要在1.12才能支持[1]。
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-18417
> > >
> > > zilong xiao <[hidden email]> 于2020年8月3日周一 下午8:23写道:
> > >
> > > > 不行的,试过了,遍历map把里面的entity全部toString都不行,应该是Java泛型机制的原因,不知道社区大佬怎么看待这个问题
> > > >
> > > > godfrey he <[hidden email]> 于2020年8月3日周一 下午7:50写道:
> > > >
> > > > > 你把Map<Object, Object>换为Map<String, String>试试
> > > > >
> > > > > zilong xiao <[hidden email]> 于2020年8月3日周一 下午4:56写道:
> > > > >
> > > > > > 目前转List可以用数组代替,Map貌似没法成功运行
> > > > > >
> > > > > > zilong xiao <[hidden email]> 于2020年8月3日周一 上午10:43写道:
> > > > > >
> > > > > > > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
> > > > > > > Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报
> > > `Type
> > > > is
> > > > > > not
> > > > > > > supported:
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
> > > > > > >
> > > STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array
> > > > > > Json2Map
> > > > > > > udf应该怎么操作呢?求前辈指导
> > > > > > >
> > > > > > > udfd代码如下:
> > > > > > >
> > > > > > > public class Json2List extends ScalarFunction {
> > > > > > >
> > > > > > >    private static final Logger LOG =
> > > > > > LoggerFactory.getLogger(Json2List.class);
> > > > > > >
> > > > > > >    private static final ObjectMapper OBJECT_MAPPER = new
> > > > ObjectMapper()
> > > > > > >       .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
> > > > > > >
>  .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS,
> > > > > true) ;
> > > > > > >
> > > > > > >    public Json2List(){}
> > > > > > >
> > > > > > >    public List<String> eval(String param) {
> > > > > > >       List<String> result = new ArrayList<>();
> > > > > > >       try {
> > > > > > >          List<Map<Object, Object>> list =
> > > > > OBJECT_MAPPER.readValue(param,
> > > > > > List.class);
> > > > > > >          for(Map<Object, Object> map : list){
> > > > > > >             result.add(OBJECT_MAPPER.writeValueAsString(map));
> > > > > > >          }
> > > > > > >          return result;
> > > > > > >       } catch (JsonProcessingException e){
> > > > > > >          LOG.error("failed to convert json to array, param is:
> > {}",
> > > > > > param, e);
> > > > > > >       }
> > > > > > >       return result;
> > > > > > >    }
> > > > > > >
> > > > > > >
> > > > > > >    @Override
> > > > > > >    public TypeInformation<List<String>>
> getResultType(Class<?>[]
> > > > > > signature) {
> > > > > > >       return Types.LIST(Types.STRING);
> > > > > > >    }
> > > > > > >
> > > > > > > }
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>
Reply | Threaded
Open this post in threaded view
|

Re: UDF:Type is not supported: ANY

Benchao Li-2
看起来写法没啥问题,我们就是这么用的。
你用的是哪个版本的Flink?然后是怎么注册的UDF呢?

zilong xiao <[hidden email]> 于2020年8月6日周四 下午12:06写道:

> 我这么写过,貌似不行,下面是我的代码,可否看下是否可行?
>
> public class Json2Map extends ScalarFunction {
>
>    private static final Logger LOG =
> LoggerFactory.getLogger(Json2Map.class);
>
>    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
>
>    public Json2Map(){}
>
>    public Map<String, String> eval(String param) {
>       Map<String, String> result = new HashMap<>();
>       try {
>          if (param == null) {
>             return result;
>          }
>          result = OBJECT_MAPPER.readValue(param, Map.class);
>          // 遍历toString 貌似也不行
>          //for(Object obj : tmp.keySet()){
>          // result.put(String.valueOf(obj), String.valueOf(tmp.get(obj)));
>          //}
>          LOG.info("result is: {}", result);
>       } catch (JsonProcessingException e){
>          LOG.error("failed to convert json to map, param is: {}", param,
> e);
>       }
>       return result;
>    }
>
>
>    @Override
>    public TypeInformation<Map<String, String>>
> getResultType(Class<?>[] signature) {
>       return Types.MAP(Types.STRING, Types.STRING);
>    }
>
> }
>
>
> Benchao Li <[hidden email]> 于2020年8月6日周四 上午11:04写道:
>
> > 可以直接返回Map类型呀,比如:
> >
> > public class String2Map extends ScalarFunction {
> >
> >    public Map<String, String> eval(String param) throws Exception {
> >       Map<String, String> map = new HashMap<>();
> >       // ...
> >       return map;
> >    }
> >
> >    @Override
> >    public TypeInformation<?> getResultType(Class<?>[] signature) {
> >       return Types.MAP(Types.STRING, Types.STRING);
> >    }
> >
> > }
> >
> >
> > zilong xiao <[hidden email]> 于2020年8月6日周四 上午10:24写道:
> >
> > >
> > >
> >
> 感谢benchao大佬回答,对于Java用户而言,Map容器用的也比较多,目前有用户需要从json字符串中取对应key的值,目前的做法是提供了一个getValueFromJson的UDF,每次要取值时都要将json反序列化成Map然后get(key),有什么方法可以使得UDF直接返回Map类型吗?这样就可以避免每次都去反序列化然后取值
> > >
> > > Benchao Li <[hidden email]> 于2020年8月5日周三 下午11:49写道:
> > >
> > > > Hi zilong,
> > > >
> > > > SQL里面的ARRAY类型,对应的legacy
> > > type,应该是Types.PRIMITIVE_ARRAY或者Types.OBJECT_ARRAY,
> > > > 其他类型的type information会被当做any类型来处理。
> > > > 这里应该跟泛型没有关系,就是在实现的时候并没有考虑将Types.LIST(Types.STRING)当做SQL里面的ARRAY类型。
> > > > 支持List作为ARRAY的数据,应该要在1.12才能支持[1]。
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-18417
> > > >
> > > > zilong xiao <[hidden email]> 于2020年8月3日周一 下午8:23写道:
> > > >
> > > > > 不行的,试过了,遍历map把里面的entity全部toString都不行,应该是Java泛型机制的原因,不知道社区大佬怎么看待这个问题
> > > > >
> > > > > godfrey he <[hidden email]> 于2020年8月3日周一 下午7:50写道:
> > > > >
> > > > > > 你把Map<Object, Object>换为Map<String, String>试试
> > > > > >
> > > > > > zilong xiao <[hidden email]> 于2020年8月3日周一 下午4:56写道:
> > > > > >
> > > > > > > 目前转List可以用数组代替,Map貌似没法成功运行
> > > > > > >
> > > > > > > zilong xiao <[hidden email]> 于2020年8月3日周一 上午10:43写道:
> > > > > > >
> > > > > > > > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
> > > > > > > >
> Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报
> > > > `Type
> > > > > is
> > > > > > > not
> > > > > > > > supported:
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
> > > > > > > >
> > > > STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array
> > > > > > > Json2Map
> > > > > > > > udf应该怎么操作呢?求前辈指导
> > > > > > > >
> > > > > > > > udfd代码如下:
> > > > > > > >
> > > > > > > > public class Json2List extends ScalarFunction {
> > > > > > > >
> > > > > > > >    private static final Logger LOG =
> > > > > > > LoggerFactory.getLogger(Json2List.class);
> > > > > > > >
> > > > > > > >    private static final ObjectMapper OBJECT_MAPPER = new
> > > > > ObjectMapper()
> > > > > > > >       .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES,
> true)
> > > > > > > >
> >  .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS,
> > > > > > true) ;
> > > > > > > >
> > > > > > > >    public Json2List(){}
> > > > > > > >
> > > > > > > >    public List<String> eval(String param) {
> > > > > > > >       List<String> result = new ArrayList<>();
> > > > > > > >       try {
> > > > > > > >          List<Map<Object, Object>> list =
> > > > > > OBJECT_MAPPER.readValue(param,
> > > > > > > List.class);
> > > > > > > >          for(Map<Object, Object> map : list){
> > > > > > > >
>  result.add(OBJECT_MAPPER.writeValueAsString(map));
> > > > > > > >          }
> > > > > > > >          return result;
> > > > > > > >       } catch (JsonProcessingException e){
> > > > > > > >          LOG.error("failed to convert json to array, param
> is:
> > > {}",
> > > > > > > param, e);
> > > > > > > >       }
> > > > > > > >       return result;
> > > > > > > >    }
> > > > > > > >
> > > > > > > >
> > > > > > > >    @Override
> > > > > > > >    public TypeInformation<List<String>>
> > getResultType(Class<?>[]
> > > > > > > signature) {
> > > > > > > >       return Types.LIST(Types.STRING);
> > > > > > > >    }
> > > > > > > >
> > > > > > > > }
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Benchao Li
> > > >
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


--

Best,
Benchao Li