Hi,
我在使用flink1.10.1的sql功能,hbase的版本是1.4.3,写入hbase时,提示validateSchemaAndApplyImplicitCast报错,意思是Query的Schema和Sink的Schema不一致。主要是Query Schema中的Row(EXPR$0),里面都是表达式。Sink Schema中是Row(device_id)这种。我不知道,在sql中如何写,才能和hbase的sink schema保持一致。 我尝试了,类似于在select device_id as rowkey, ROW( device_id as 这里不能as ) as f1,不写的话,Query 中ROW的 Schema都是表达式,不是具体定义的一个字段 这里query和sink的字段个数,是对上的。每个字段的类型也是对应上的。就是Query的Schema中是表达式,没法保持一致 报错信息如下: 关键代码: HBase sink ddl: String ddlSource = "CREATE TABLE test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping (\n" + " rowkey STRING,\n" + " f1 ROW< \n" + " device_id STRING,\n" + " pass_id STRING,\n" + " first_date STRING,\n" + " first_channel_id STRING,\n" + " first_app_version STRING,\n" + " first_server_time STRING,\n" + " first_server_hour STRING,\n" + " first_ip_location STRING,\n" + " first_login_time STRING,\n" + " sys_can_uninstall STRING,\n" + " update_date STRING,\n" + " server_time BIGINT,\n" + " last_pass_id STRING,\n" + " last_channel_id STRING,\n" + " last_app_version STRING,\n" + " last_date STRING,\n" + " os STRING,\n" + " attribution_channel_id STRING,\n" + " attribution_first_date STRING,\n" + " p_product STRING,\n" + " p_project STRING,\n" + " p_dt STRING\n" + " >\n" + ") WITH (\n" + " 'connector.type' = 'hbase',\n" + " 'connector.version' = '1.4.3',\n" + // 即使绕过语法编译,换其他版本的hbase,还是有问题,如线上的版本就不行 " 'connector.table-name' = 'dw_common_mobile_device_user_mapping_new',\n" + " 'connector.zookeeper.quorum' = '"+ zookeeperServers +"',\n" + " 'connector.zookeeper.znode.parent' = '/hbase143',\n" + " 'connector.write.buffer-flush.max-size' = '2mb',\n" + " 'connector.write.buffer-flush.max-rows' = '1000',\n" + " 'connector.write.buffer-flush.interval' = '2s'\n" + ")"; insert into sql: String bodyAndLocalSql = "" + // "insert into test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping " + "SELECT CAST(rowkey AS STRING) AS rowkey, " + " ROW(" + " device_id, pass_id, first_date, first_channel_id, first_app_version, first_server_time, first_server_hour, first_ip_location, first_login_time, sys_can_uninstall, update_date, server_time, last_pass_id, last_channel_id, last_app_version, last_date, os, attribution_channel_id, attribution_first_date, p_product, p_project, p_dt " + ") AS f1" + " FROM " + "(" + " SELECT " + " MD5(CONCAT_WS('|', kafka.uid, kafka.p_product, kafka.p_project)) AS rowkey, " + " kafka.uid AS device_id " + ",kafka.pass_id " + // first_date ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " + // 新用户 " THEN FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd') " + // 老用户 " ELSE hbase.first_date END AS first_date " + // first_channel_id ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " + // 新用户 " THEN kafka.wlb_channel_id" + // 老用户 " ELSE hbase.first_channel_id END AS first_channel_id " + // first_app_version ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " + // 新用户 " THEN kafka.app_version " + // 老用户 " ELSE hbase.first_app_version END AS first_app_version " + // first_server_time ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " + // 新用户 " THEN FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd HH:mm:ss') " + // 老用户 " ELSE hbase.first_server_time END AS first_server_time " + // first_server_hour ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " + // 新用户 " THEN FROM_UNIXTIME(kafka.server_time, 'HH') " + // 老用户 " ELSE hbase.first_server_hour END AS first_server_hour " + // first_ip_location ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " + // 新用户 " THEN kafka.ip_location " + // 老用户 " ELSE hbase.first_ip_location END AS first_ip_location " + // first_login_time ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " + // 新用户 " THEN FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd HH:mm:ss') " + // 老用户 " ELSE hbase.first_login_time END AS first_login_time " + ",kafka.sys_can_uninstall " + // update_date ",CASE WHEN hbase.pass_id = 0 " + " THEN CAST(FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd') AS string) " + " END AS update_date " + // VARCHAR(2000) // server_time ",kafka.server_time" + ", kafka.pass_id AS last_pass_id" + ", kafka.wlb_channel_id AS last_channel_id" + ", kafka.app_version AS last_app_version" + ", CAST(FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd') AS STRING) AS last_date" + // VARCHAR(2000) ", kafka.os" + ", hbase.attribution_channel_id" + ", hbase.attribution_first_date" + ", kafka.p_product" + ", kafka.p_project" + ", kafka.p_dt" + " FROM test_hive_catalog.test_ods.test_ods_header AS kafka " + " FULL JOIN test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping AS hbase " + " ON kafka.uid = hbase.device_id " + // TODO 这里uid,后面要改成device_id " WHERE kafka.is_body=1 AND kafka.is_local=1" + ")"; |
Free forum by Nabble | Edit this page |