搜了下之前的邮件,貌似没有发现和我同样的问题。
lib 下的Jar flink-csv-1.11.3.jar flink-table-blink_2.11-1.11.3.jar flink-dist_2.11-1.11.3.jar flink-table_2.11-1.11.3.jar flink-jdbc_2.11-1.11.3.jar log4j-1.2-api-2.12.1.jar flink-json-1.11.3.jar log4j-api-2.12.1.jar flink-shaded-zookeeper-3.4.14.jar log4j-core-2.12.1.jar flink-sql-connector-elasticsearch6_2.11-1.11.3.jar log4j-slf4j-impl-2.12.1.jar flink-sql-connector-kafka_2.11-1.11.3.jar mysql-connector-java-5.1.48.jar flink bin/sql-client.sh embedded CREATE TABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3), proctime as PROCTIME(), -- 通过计算列产生一个处理时间列 WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 在ts上定义watermark,ts成为事件时间列 ) WITH ( 'connector' = 'kafka', -- 使用 kafka connector 'topic' = 'data_test', -- kafka topic 'startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取 'properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址 'format' = 'json' -- 数据源格式为 json ); -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
下载个 flink-sql-connector-kafka 这个jar 放在lib下试下
在 2021-01-09 02:08:12,"inza9hi" <[hidden email]> 写道: >搜了下之前的邮件,貌似没有发现和我同样的问题。 > >lib 下的Jar >flink-csv-1.11.3.jar >flink-table-blink_2.11-1.11.3.jar >flink-dist_2.11-1.11.3.jar >flink-table_2.11-1.11.3.jar >flink-jdbc_2.11-1.11.3.jar log4j-1.2-api-2.12.1.jar >flink-json-1.11.3.jar log4j-api-2.12.1.jar >flink-shaded-zookeeper-3.4.14.jar log4j-core-2.12.1.jar >flink-sql-connector-elasticsearch6_2.11-1.11.3.jar >log4j-slf4j-impl-2.12.1.jar >flink-sql-connector-kafka_2.11-1.11.3.jar >mysql-connector-java-5.1.48.jar > >flink bin/sql-client.sh embedded > >CREATE TABLE user_behavior ( > user_id BIGINT, > item_id BIGINT, > category_id BIGINT, > behavior STRING, > ts TIMESTAMP(3), > proctime as PROCTIME(), -- 通过计算列产生一个处理时间列 > WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- >在ts上定义watermark,ts成为事件时间列 > >) WITH ( > 'connector' = 'kafka', -- 使用 kafka connector > 'topic' = 'data_test', -- kafka topic > 'startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取 > 'properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址 > 'format' = 'json' -- 数据源格式为 json >); > > > > >-- >Sent from: http://apache-flink.147419.n8.nabble.com/ |
hello
你放 flink-sql-connector-kafka_2.11-1.11.3.jar 后有重启 sql client 和 集群吗? Best zhisheng air23 <[hidden email]> 于2021年1月11日周一 下午1:32写道: > 下载个 flink-sql-connector-kafka 这个jar 放在lib下试下 > > > > > > > > > > > > > > > > > > 在 2021-01-09 02:08:12,"inza9hi" <[hidden email]> 写道: > >搜了下之前的邮件,貌似没有发现和我同样的问题。 > > > >lib 下的Jar > >flink-csv-1.11.3.jar > >flink-table-blink_2.11-1.11.3.jar > >flink-dist_2.11-1.11.3.jar > >flink-table_2.11-1.11.3.jar > >flink-jdbc_2.11-1.11.3.jar > log4j-1.2-api-2.12.1.jar > >flink-json-1.11.3.jar log4j-api-2.12.1.jar > >flink-shaded-zookeeper-3.4.14.jar log4j-core-2.12.1.jar > >flink-sql-connector-elasticsearch6_2.11-1.11.3.jar > >log4j-slf4j-impl-2.12.1.jar > >flink-sql-connector-kafka_2.11-1.11.3.jar > >mysql-connector-java-5.1.48.jar > > > >flink bin/sql-client.sh embedded > > > >CREATE TABLE user_behavior ( > > user_id BIGINT, > > item_id BIGINT, > > category_id BIGINT, > > behavior STRING, > > ts TIMESTAMP(3), > > proctime as PROCTIME(), -- 通过计算列产生一个处理时间列 > > WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- > >在ts上定义watermark,ts成为事件时间列 > > > >) WITH ( > > 'connector' = 'kafka', -- 使用 kafka connector > > 'topic' = 'data_test', -- kafka topic > > 'startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取 > > 'properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址 > > 'format' = 'json' -- 数据源格式为 json > >); > > > > > > > > > >-- > >Sent from: http://apache-flink.147419.n8.nabble.com/ > |
Free forum by Nabble | Edit this page |