dear all:
我用flink 注册一张表: CREATE TABLE dim_mysql ( id int, -- type varchar -- ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3390/test', 'table-name' = 'flink_test', 'driver' = 'com.mysql.cj.jdbc.Driver', 'username' = '****', 'password' = '****', 'lookup.cache.max-rows' = '5000', 'lookup.cache.ttl' = '1s', 'lookup.max-retries' = '3' ) 有没有通过 tableEnv 去获取,字段[id,type] 类型[INTEGER,VARCHAR] 以及属性,map<String,String> 这种。 我看阿里官方有blink 支持自定义sink: publicabstractclassCustomSinkBaseimplementsSerializable{ protectedMap<String,String> userParamsMap;// 您在sql with语句中定义的键值对,但所有的键均为小写 protectedSet<String> primaryKeys;// 您定义的主键字段名 protectedList<String> headerFields;// 标记为header的字段列表 protectedRowTypeInfo rowTypeInfo;// 字段类型和名称 核心需求是:获取定义的表的所有属性,自己实现自己的功能,包括 join sink 等各种逻辑 |
tableEnv 中 可以通过
tableEvn.from(xx).getSchema() 拿到该表的schema信息,但是没法拿到对应的properties。 如果要拿到properties,可以通过catalog的接口得到 [1]。 如果要自定义实现source/sink,可以参考 [2] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/catalogs.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sourceSinks.html Best, Godfrey Michael Ran <[hidden email]> 于2020年7月22日周三 下午4:10写道: > dear all: > 我用flink 注册一张表: > CREATE TABLE dim_mysql ( > id int, -- > type varchar -- > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://localhost:3390/test', > 'table-name' = 'flink_test', > 'driver' = 'com.mysql.cj.jdbc.Driver', > 'username' = '****', > 'password' = '****', > 'lookup.cache.max-rows' = '5000', > 'lookup.cache.ttl' = '1s', > 'lookup.max-retries' = '3' > ) > 有没有通过 tableEnv 去获取,字段[id,type] 类型[INTEGER,VARCHAR] > 以及属性,map<String,String> 这种。 > 我看阿里官方有blink 支持自定义sink: > publicabstractclassCustomSinkBaseimplementsSerializable{ > protectedMap<String,String> userParamsMap;// 您在sql with语句中定义的键值对,但所有的键均为小写 > protectedSet<String> primaryKeys;// 您定义的主键字段名 > protectedList<String> headerFields;// 标记为header的字段列表 > protectedRowTypeInfo rowTypeInfo;// 字段类型和名称 > 核心需求是:获取定义的表的所有属性,自己实现自己的功能,包括 join sink 等各种逻辑 |
1.tableEvn.from(xx).getSchema() 我确实通过这个拿到了schema,<br/>2.with properties属性很重要 ,关系我自定义的一些参数设定。<br/>3.关于 catalog 这个东西,是不是只有1.11 版本才能从catalog 获取 with properties 哦? 1.10 you 有支持吗
在 2020-07-22 18:22:22,"godfrey he" <[hidden email]> 写道: >tableEnv 中 可以通过 >tableEvn.from(xx).getSchema() 拿到该表的schema信息,但是没法拿到对应的properties。 >如果要拿到properties,可以通过catalog的接口得到 [1]。 >如果要自定义实现source/sink,可以参考 [2] > >[1] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/catalogs.html >[2] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sourceSinks.html > >Best, >Godfrey > > > > > >Michael Ran <[hidden email]> 于2020年7月22日周三 下午4:10写道: > >> dear all: >> 我用flink 注册一张表: >> CREATE TABLE dim_mysql ( >> id int, -- >> type varchar -- >> ) WITH ( >> 'connector' = 'jdbc', >> 'url' = 'jdbc:mysql://localhost:3390/test', >> 'table-name' = 'flink_test', >> 'driver' = 'com.mysql.cj.jdbc.Driver', >> 'username' = '****', >> 'password' = '****', >> 'lookup.cache.max-rows' = '5000', >> 'lookup.cache.ttl' = '1s', >> 'lookup.max-retries' = '3' >> ) >> 有没有通过 tableEnv 去获取,字段[id,type] 类型[INTEGER,VARCHAR] >> 以及属性,map<String,String> 这种。 >> 我看阿里官方有blink 支持自定义sink: >> publicabstractclassCustomSinkBaseimplementsSerializable{ >> protectedMap<String,String> userParamsMap;// 您在sql with语句中定义的键值对,但所有的键均为小写 >> protectedSet<String> primaryKeys;// 您定义的主键字段名 >> protectedList<String> headerFields;// 标记为header的字段列表 >> protectedRowTypeInfo rowTypeInfo;// 字段类型和名称 >> 核心需求是:获取定义的表的所有属性,自己实现自己的功能,包括 join sink 等各种逻辑 |
1.10 也是支持的
Michael Ran <[hidden email]> 于2020年7月22日周三 下午9:07写道: > 1.tableEvn.from(xx).getSchema() 我确实通过这个拿到了schema,<br/>2.with > properties属性很重要 ,关系我自定义的一些参数设定。<br/>3.关于 catalog 这个东西,是不是只有1.11 > 版本才能从catalog 获取 with properties 哦? 1.10 you 有支持吗 > 在 2020-07-22 18:22:22,"godfrey he" <[hidden email]> 写道: > >tableEnv 中 可以通过 > >tableEvn.from(xx).getSchema() 拿到该表的schema信息,但是没法拿到对应的properties。 > >如果要拿到properties,可以通过catalog的接口得到 [1]。 > >如果要自定义实现source/sink,可以参考 [2] > > > >[1] > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/catalogs.html > >[2] > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sourceSinks.html > > > >Best, > >Godfrey > > > > > > > > > > > >Michael Ran <[hidden email]> 于2020年7月22日周三 下午4:10写道: > > > >> dear all: > >> 我用flink 注册一张表: > >> CREATE TABLE dim_mysql ( > >> id int, -- > >> type varchar -- > >> ) WITH ( > >> 'connector' = 'jdbc', > >> 'url' = 'jdbc:mysql://localhost:3390/test', > >> 'table-name' = 'flink_test', > >> 'driver' = 'com.mysql.cj.jdbc.Driver', > >> 'username' = '****', > >> 'password' = '****', > >> 'lookup.cache.max-rows' = '5000', > >> 'lookup.cache.ttl' = '1s', > >> 'lookup.max-retries' = '3' > >> ) > >> 有没有通过 tableEnv 去获取,字段[id,type] 类型[INTEGER,VARCHAR] > >> 以及属性,map<String,String> 这种。 > >> 我看阿里官方有blink 支持自定义sink: > >> publicabstractclassCustomSinkBaseimplementsSerializable{ > >> protectedMap<String,String> userParamsMap;// 您在sql > with语句中定义的键值对,但所有的键均为小写 > >> protectedSet<String> primaryKeys;// 您定义的主键字段名 > >> protectedList<String> headerFields;// 标记为header的字段列表 > >> protectedRowTypeInfo rowTypeInfo;// 字段类型和名称 > >> 核心需求是:获取定义的表的所有属性,自己实现自己的功能,包括 join sink 等各种逻辑 > |
Free forum by Nabble | Edit this page |