Hi!
Optional.of(new customPartitioner())
Ye Chen wrote
> 各位好,我想实现自定义partition,继承了 FlinkKafkaPartitioner 后,使用会报错,简化的代码如下。
> //自定义partition
> public class customPartitioner extends FlinkKafkaPartitioner
> <String>
> {
> @Override
> public int partition(String record, byte[] key, byte[] value, String
> targetTopic, int[] partitions) {
> return 0;
> }
> }
>
>
> DataStream
> <String>
> stream = 。。。
> FlinkKafkaProducer
> <String>
> myProducer = new FlinkKafkaProducer<>(
> "test_topic",
> new SimpleStringSchema(),
> properties,
> new customPartitioner()
> );
> stream.addSink(myProducer);
>
>
> //上面的代码,编辑器中编译FlinkKafkaProducer会报错,【Error:(55, 49) java:
> 无法推断org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer<>的类型参数】
> //去掉new
> customPartitioner(),不使用自定义partition,FlinkKafkaProducer就不报错,感觉是构造函数对应不上,但是查看构造函数源码有这个构造函数
>
>
>
>
> 查看FlinkKafkaProducer源码如下,我上面的写法有问题么?
> public FlinkKafkaProducer(
> String topicId,
> SerializationSchema
> <IN>
> serializationSchema,
> Properties producerConfig,
> Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
> this(
> topicId,
> serializationSchema,
> producerConfig,
> customPartitioner.orElse(null),
> Semantic.AT_LEAST_ONCE,
> DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
> }
--
Sent from:
http://apache-flink.147419.n8.nabble.com/