当前位置: 首页 > news >正文

平湖手机网站建设seo外包网站

平湖手机网站建设,seo外包网站,青岛公司网站建设公司,网站建设的要求有哪些方面flink同步kafka到paimon,doris加速查询 kafka to paimonyarn-session启动命令paimon 底层存储是minio按库同步-通用命令行:同步单个topic 单表 paimon 底层存储是hdfs支持kafka topic 数据带schema 命令行:部分数据格式命令行 paimon 时间旅行…

flink同步kafka到paimon,doris加速查询

  • kafka to paimon
    • yarn-session启动命令
    • paimon 底层存储是minio
      • 按库同步-通用命令行:
      • 同步单个topic 单表
    • paimon 底层存储是hdfs
      • 支持kafka topic 数据带schema 命令行:
        • 部分数据格式
        • 命令行
  • paimon 时间旅行
    • 查询表快照
    • 查询tag
    • 周期生成tag
  • 查询paimon
    • 基础语法
    • doris基于hdfs创建catalog
    • flink sql - 查询S3上的表
    • flink sql - 查询hdfs上的表数据
    • flink sql - 查询表快照数据
  • 遇到的问题及解决方案
    • 1、指定了hive元数据,paimon 表在flink sql 删除了,hive存储的表元数据无法删除
    • 2、kafka cdc api debug 同时出现2个报错
    • 3、Failed to execute goal on project paimon-common: Could not resolve dependencies for project org.apache.paimon:paimon-common:jar:1.0-SNAPSHOT: Could not find artifact org.apache.paimon:paimon-test-utils:jar:1.0-SNAPSHOT in aliyunmaven (https://maven.aliyun.com/repository/public) -> [Help 1]**
    • 4、This exception is intentionally thrown after committing the restored checkpoints. By restarting the job we hope that writers can start writing based on these new commits.
    • 5、Caused by: java.lang.ClassNotFoundException: org.codehaus.stax2.XMLInputFactory2
    • 6、paimon 命令行不支持删除字段,通过flink sql 执行 删除字段语句后,在查询端刷新catalog 元数据,重新查询字段才能更新
  • flink + paimon 的pg cdc 同步
    • flink 1.15.3 + pg 11
    • flink 1.15.3 + pg 15

kafka to paimon

yarn-session启动命令

启动本地集群,提供flink sql 查询,推荐使用doris进行加速查询

bin/yarn-session.sh -nm yarn-session2 -tm 6144m -qu flink -d

同步时动态识别kafka - topic 正则表达式:

^debezium.plus.test.test_instance.(?!mc_background_setting$).+$排除topic:debezium.plus.test.test_instance.mc_background_setting

执行同步命令注意事项如下:
备注:topic 动态识别参数:properties.partition.discovery.interval.ms=30000
地址:https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/

不是kafka 的参数:scan.topic-partition-discovery.interval

paimon 底层存储是minio

按库同步-通用命令行:

bin/flink run -m yarn-cluster \
-ynm paimon_kafka_sync_database \
-yqu flink \
-ytm 4096m \
-ys 1 \
-p 6 \
-D execution.runtime-mode=batch \
-D execution.buffer-timeout=10ms \
-D taskmanager.memory.managed.fraction=0.4 \
-D table.exec.resource.default-parallelism=6 \
lib/paimon-flink-action-1.0.1.jar \
kafka_sync_database \
--warehouse s3://dev-bucket-bigdata-flink/paimon \
--database demo \
--primary_keys id \
--kafka_conf connector=upsert-kafka \
--kafka_conf 'properties.bootstrap.servers=kafka-test01.com:32295,kafka-test02.com:32295,kafka-test03.com:32295' \
--kafka_conf 'properties.security.protocol=SASL_PLAINTEXT' \
--kafka_conf 'properties.sasl.mechanism=PLAIN' \
--kafka_conf 'properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";' \
--kafka_conf 'topic=debezium.plus.test_breeding.test_breeding.feed_message_report' \
--kafka_conf 'properties.group.id=cid_yz.bigdata.paimon_sync_$(date +%s)' \
--kafka_conf 'properties.partition.discovery.interval.ms=30000' \
--kafka_conf 'properties.max.poll.records=1000' \
--kafka_conf 'scan.startup.mode=earliest-offset' \
--kafka_conf 'key.format=debezium-json' \
--kafka_conf 'value.format=debezium-json' \
--catalog_conf metastore=filesystem \
--catalog_conf 's3.endpoint=https://yos.test.com' \
--catalog_conf 's3.access-key=bigdata-flink-user' \
--catalog_conf 's3.secret-key=25dhHosSbUxcsQJINzKZr8D' \
--catalog_conf 's3.path.style.access=true' \
--catalog_conf 's3.connection.maximum=50' \
--catalog_conf 's3.threads.max=20' \
--table_conf bucket=6 \
--table_conf changelog-producer=input \
--table_conf sink.parallelism=6 \
--table_conf 'compaction.trigger=num_commits' \
--table_conf 'compaction.num_commits=10' \
--table_conf schema.automerge=false \
--table_conf auto-create-table=true \
--computed_column 'compute_time__=now() STORED' \  # 新增计算时间列
--table_conf 'snapshot.time-retained=1h' \  # 快照保存时间
--table_conf 'snapshot.num-retained.min=1' \ # 快照保存最小数量
--table_conf 'snapshot.num-retained.max=5' \	# 快照保存最大数量
--table_conf tag.automatic-creation=process-time \	# tag 基于处理时间创建,保存全量快照版本数据
--table_conf tag.creation-period=hourly	# tag创建周期

同步单个topic 单表

bin/flink run -m yarn-cluster -ynm paimon_kafka_sync_database -ytm 3172m -ys 1 -yqu flink -d \
-D execution.runtime-mode=batch \
lib/paimon-flink-action-0.8.2.jar \
kafka_sync_table  \
--warehouse s3://dev-bucket-bigdata-flink/paimon \
--database demo \
--table event_list2 \
--partition_keys org_id \
--primary_keys org_id,id \
--kafka_conf connector=upsert-kafka \
--kafka_conf properties.bootstrap.servers=kafka-test01.com:32295,kafka-test02.com:32295,kafka-test03.com:32295 \
--kafka_conf properties.security.protocol=SASL_PLAINTEXT \
--kafka_conf properties.sasl.mechanism=PLAIN \
--kafka_conf properties.sasl.jaas.config='org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";' \
--kafka_conf topic=debezium.plus.test_citus.test_citus.event_list \
--kafka_conf scan.startup.mode=earliest-offset \
--kafka_conf key.format=debezium-json \
--kafka_conf value.format=debezium-json \--catalog_conf metastore=filesystem \
--catalog_conf s3.endpoint=https://yos.test.com \
--catalog_conf s3.access-key='bigdata-flink-user' \
--catalog_conf s3.secret-key='25dhHosSbUtx9XQJINzKZr8D' \
--catalog_conf s3.path.style.access='true' \
--table_conf bucket=4 \
--table_conf changelog-producer=input \
--table_conf sink.parallelism=4

paimon 底层存储是hdfs

支持kafka topic 数据带schema 命令行:

部分数据格式
{"schema": {"type": "struct","fields": [{"type": "struct","fields": [{"type": "int64","optional": false,"field": "id"},{"type": "string","optional": true,"field": "create_user"}],"optional": false,"name": "debezium.plus.test_metrics.test_metrics.ods_event_list.Envelope"},"payload": {"before": null,"after": {"id": 781465173331251450,"create_user": "708978104768487425","create_time": "2020-11-26 10:23:38.555","source_type": 0,"status": 1,},"source": {"version": "1.8.1.Final-YZ","connector": "postgresql","name": "test_metrics","txId": 3655680338,"lsn": 33390055263776,"xmin": null},"op": "r","ts_ms": 1742200695318,"transaction": null}
}
命令行
bin/flink run -m yarn-cluster -ynm paimon_kafka_sync_table -ytm 3172m -ys 1 -yqu flink -d \
-D execution.runtime-mode=batch \
lib/paimon-flink-action-1.0.1.jar \
kafka_sync_table  \
--warehouse hdfs:///paimon/flink \
--database demo \
--table ods_event_list4 \
--primary_keys id \
--kafka_conf connector=upsert-kafka \
--kafka_conf properties.bootstrap.servers=kafka-test01.com:32295,kafka-test02.com:32295,kafka-test03.com:32295 \
--kafka_conf properties.security.protocol=SASL_PLAINTEXT \
--kafka_conf properties.sasl.mechanism=PLAIN \
--kafka_conf properties.sasl.jaas.config='org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";' \
--kafka_conf topic=debezium.plus.test_metrics.test_metrics.ods_event_list \
--kafka_conf scan.startup.mode=earliest-offset \
--kafka_conf key.format=debezium-json \
--kafka_conf value.format=debezium-json \
--kafka_conf debezium-json.schema-include=true \
--catalog_conf metastore=hive \
--catalog_conf uri=thrift://hdfs-test04.com:9083 \
--table_conf bucket=4 \
--table_conf changelog-producer=input \
--table_conf sink.parallelism=4

paimon 时间旅行

查询表快照

查询表快照版本
SELECT * FROM `paimon_catalog`.`demo`.`ods_event_list10$snapshots`;SELECT id, source_id
FROM `paimon_catalog`.`demo`.`ods_event_list10`/*+ OPTIONS('scan.snapshot-id'='7') */ 
WHERE 1=1;

查询tag

SELECT * FROM `paimon_catalog`.`demo`.`ods_event_list10$tags`;SELECT id, source_id
FROM `paimon_catalog`.`demo`.`ods_event_list10` /*+ OPTIONS('scan.tag-name' = '2025-04-30 13') */;

周期生成tag

bin/flink run -m yarn-cluster \
-ynm paimon_kafka_sync_database \
-yqu flink \
-ytm 4096m \
-ys 1 \
-p 1 \
-D execution.runtime-mode=batch \
-D execution.buffer-timeout=10ms \
-D taskmanager.memory.managed.fraction=0.4 \
-D table.exec.resource.default-parallelism=1 \
-D state.checkpoints.dir=hdfs://yinziid/flink/flink-checkpoints/3d371a1d8f175c810df708c40198ee2c/chk-1869 \
lib/paimon-flink-action-1.1.0.jar \
kafka_sync_database \
--warehouse s3://dev-bucket-bigdata-flink/paimon \
--database demo \
--primary_keys id \
--kafka_conf connector=upsert-kafka \
--kafka_conf 'properties.bootstrap.servers=kafka-test01.com:32295,kafka-test02.com:32295,kafka-test03.com:32295' \
--kafka_conf 'properties.security.protocol=SASL_PLAINTEXT' \
--kafka_conf 'properties.sasl.mechanism=PLAIN' \
--kafka_conf 'properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="bigdata" password="XPZavdlM";' \
--kafka_conf 'topic=debezium.plus.test_metrics.test_metrics.ods_event_list10' \
--kafka_conf 'properties.group.id=cid_yz.bigdata.paimon_sync_$(date +%s)' \
--kafka_conf 'properties.partition.discovery.interval.ms=30000' \
--kafka_conf 'properties.max.poll.records=1000' \
--kafka_conf 'scan.startup.mode=earliest-offset' \
--kafka_conf 'key.format=debezium-json' \
--kafka_conf 'value.format=debezium-json' \
--catalog_conf metastore=filesystem \
--catalog_conf 's3.endpoint=https://yos.test.com' \
--catalog_conf 's3.access-key=bigdata-flink-user' \
--catalog_conf 's3.secret-key=25dhHosSbUtx9XQJINzKZr8D' \
--catalog_conf 's3.path.style.access=true' \
--catalog_conf 's3.connection.maximum=50' \
--catalog_conf 's3.threads.max=20' \
--table_conf bucket=6 \
--table_conf changelog-producer=input \
--table_conf sink.parallelism=1 \
--table_conf 'compaction.trigger=num_commits' \
--table_conf 'compaction.num_commits=10' \
--table_conf schema.automerge=false \
--table_conf auto-create-table=true \
--table_conf audit-log.enabled=true \
--computed_column 'compute_time__=now() STORED' \
--table_conf 'snapshot.time-retained=1h' \
--table_conf 'snapshot.num-retained.min=1' \
--table_conf 'snapshot.num-retained.max=5' \
--table_conf tag.automatic-creation=process-time \
--table_conf tag.creation-period=hourly

查询paimon

基础语法

show CREATE catalog minio_paimon_s3;show catalogs;refresh catalog paimon_catalog;  刷新元数据switch paimon_catalog;  -- 切换catalog
show databasesuse demo;show tables;select * from paimon_catalog.demo.ods_ids_table_column_infoCREATE CATALOG iceberg_catalog PROPERTIES ("type" = "iceberg","iceberg.catalog.type" = "hms","hive.metastore.uris" = "thrift://hive-metastore:9083","metadata_refresh_interval_sec" = "3600"  -- 每隔 1 小时自动刷新元数据"metadata_refresh_interval_sec" = "10"  -- 每隔10s 自动刷新元数据
);查看表类别show tables in my_catalog.demo;select * from my_catalog.demo.anc_event_list1select * from my_catalog.demo.anc_herd_file_change_log_102589

doris基于hdfs创建catalog

– 刷新schema 变更
refresh catalog my_catalog;

CREATE CATALOG my_catalog PROPERTIES (
"type" = "paimon",
"warehouse" = "hdfs://yinziid/paimon/flink",
"dfs.nameservices" = "yinziid",
"dfs.ha.namenodes.yinziid" = "nn1,nn2",
"dfs.namenode.rpc-address.yinziid.nn1" = "hdfs://hdfs-test06.com:8020",
"dfs.namenode.rpc-address.yinziid.nn2" = "hdfs://hdfs-test01.com:8020",
"dfs.client.failover.proxy.provider.yinziid" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"hadoop.username" = "hdfs"
);

doris基于S3 创建doris catalog:

CREATE CATALOG minio_paimon_s3 PROPERTIES (
"warehouse" = "s3://dev-bucket-bigdata-flink/paimon",
"use_path_style" = "true",
"type" = "paimon",
"s3.access_key" = "bigdata-flink-user",
"s3.secret_key" = "25dhHosSbUtx9XQJINzKZr8D",
"s3.region" = "us-east-1",
"s3.path.style.access" = "true",
"s3.endpoint" = "https://yos.test.com",'time-zone' = 'Asia/Shanghai'
);
CREATE CATALOG paimon_catalog PROPERTIES (
"warehouse" = "s3://test-bucket-bigdata-flink/paimon",
"use_path_style" = "true",
"type" = "paimon",
"s3.access_key" = "admin",
"s3.secret_key" = "1234",
"s3.region" = "us-east-1",
"s3.path.style.access" = "true",
"s3.endpoint" = "http://10.37.0.109:9000",
"time-zone" = "Asia/Shanghai",
"metadata_refresh_interval_sec" = "3600"  -- 每隔 1 小时自动刷新元数据
);

flink sql - 查询S3上的表

// 创建catalog

CREATE CATALOG minio_paimon_s3 WITH ('type' = 'paimon','warehouse' = 's3://dev-bucket-bigdata-flink/paimon','s3.endpoint' = 'https://yos.test.com','s3.access-key' = 'bigdata-flink-user','s3.path.style.access' = 'true',  's3.secret-key' = '25dhHosSbUtx9XQJINzKZr8D','time-zone' = 'Asia/Shanghai'
);

flink sql - 查询hdfs上的表数据

CREATE CATALOG my_catalog WITH(
'type'='paimon',
'warehouse'='hdfs:/paimon/flink/'
);

flink sql - 查询表快照数据

# 查询表快照版本
SELECT * FROM `paimon_catalog`.`demo`.`ods_anc_event_list10$snapshots`# 根据快照id查询快照数据
SELECT id, source_id
FROM `paimon_catalog`.`demo`.`ods_anc_event_list10`/*+ OPTIONS('scan.snapshot-id'='7') */ 
WHERE 1=1;# 查询tag
SELECT * FROM `paimon_catalog`.`demo`.`ods_anc_event_list10$tags`;SELECT id, source_id
FROM `paimon_catalog`.`demo`.`ods_anc_event_list10` /*+ OPTIONS('scan.tag-name' = '2025-04-30 13') */;

遇到的问题及解决方案

1、指定了hive元数据,paimon 表在flink sql 删除了,hive存储的表元数据无法删除

重启任务会报错如下:
Caused by: java.lang.RuntimeException: There is no paimon table in hdfs://yinziid/paimon/flink/demo.db/test_20250314解决:可以取消hive元数据指定

2、kafka cdc api debug 同时出现2个报错

报错1:Could not forward element to next operator

报错2:java.lang.RuntimeException: java.lang.reflect.InaccessibleObjectException: Unable to make field private final java.lang.Object[] java.util.Arrays$ArrayList.a accessible: module java.base does not “opens java.util” to unnamed module @5456afaa

报错3:程序包sun.misc不存在

解决:jdk 版本太高了,降为1.8


3、Failed to execute goal on project paimon-common: Could not resolve dependencies for project org.apache.paimon:paimon-common:jar:1.0-SNAPSHOT: Could not find artifact org.apache.paimon:paimon-test-utils:jar:1.0-SNAPSHOT in aliyunmaven (https://maven.aliyun.com/repository/public) -> [Help 1]**

报错2:从 github 上克隆 paimon 1.0 版本源码,debug发现程序包JavaParser不存在

在项目的 pom.xml 或全局 Maven 配置文件(~/.m2/settings.xml)中添加 Apache 的 SNAPSHOT 仓库:

<repositories><repository><id>apache-snapshots</id><name>Apache Snapshots</name><url>https://repository.apache.org/snapshots</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository>
</repositories>

4、This exception is intentionally thrown after committing the restored checkpoints. By restarting the job we hope that writers can start writing based on these new commits.

服务器之间网络延时太高,checkpoint 间隔和超时设置大一点

Dexecution.checkpointing.timeout

Dexecution.checkpointing.interval

5、Caused by: java.lang.ClassNotFoundException: org.codehaus.stax2.XMLInputFactory2

1、指定了hive元数据,paimon 表在flink sql 删除了,hive存储的表元数据无法删除,重启任务会报错如下:

报错:Caused by: java.lang.RuntimeException: There is no paimon table in hdfs://yinziid/paimon/flink/demo.db/test_20250314

解决:可以取消hive元数据指定

– 要再次验证

2、业务表id字段是bigint,消费debezium 采集到kafka 的数据 ,paimon 自动建表语句把字段类型全部写成写成字符串:VARCHAR(2147483647),任务出现数据转换异常,报错:java.lang.UnsupportedOperationException: Cannot convert field id from type STRING NOT NULL to BIGINT of Paimon table demo.test_20250314_3.

3、消费debezium 采集到kafka 的数据 ,paimon 自动建表语句把字段类型全部写成写成字符串:VARCHAR(2147483647),重建表把字段修改未bigint类型,会出现类型不兼容问题,报错如下:

Caused by: java.lang.RuntimeException: InvalidOperationException(message:The following columns have types incompatible with the existing columns in their respective positions :

**paimon 自动建表语句如下:

CREATE TABLE `my_catalog`.`demo`.`test_20250314_2` (`id` VARCHAR(2147483647) NOT NULL,`name` VARCHAR(2147483647) NOT NULL,`id2` VARCHAR(2147483647),CONSTRAINT `PK_name_id` PRIMARY KEY (`name`, `id`) NOT ENFORCED
) PARTITIONED BY (`name`)
WITH ('bucket' = '4','path' = 'hdfs:/paimon/flink/demo.db/test_20250314_2','changelog-producer' = 'input','sink.parallelism' = '4'
);

修改类型手动建表语句如下:

drop table if exists my_catalog.demo.test_20250314_2;
CREATE TABLE my_catalog.demo.test_20250314_2 (id BIGINT NOT NULL,name VARCHAR(2147483647) NOT NULL,id2 VARCHAR(2147483647),CONSTRAINT PK_name_id PRIMARY KEY (name, id) NOT ENFORCED
) PARTITIONED BY (name)
WITH ('bucket' = '4','path' = 'hdfs:/paimon/flink/demo.db/test_20250314_2','changelog-producer' = 'input','sink.parallelism' = '4'
);

flink 1.15.3 + paimon 1.0.1 , 命令行如下:

bin/flink run -m yarn-cluster -ynm paimon_kafka_sync_table -ytm 3172m -ys 1 -yqu flink -d \
-D execution.runtime-mode=batch \
lib/paimon-flink-action-1.0.1.jar \
kafka_sync_table  \
--warehouse hdfs:///paimon/flink \
--database demo \
--table ods_anc_event_list2 \
--primary_keys id \
--kafka_conf connector=upsert-kafka \
--kafka_conf properties.bootstrap.servers=kafka-test01.com:32295,kafka-test02.com:32295,kafka-test03.com:32295 \
--kafka_conf properties.security.protocol=SASL_PLAINTEXT \
--kafka_conf properties.sasl.mechanism=PLAIN \
--kafka_conf properties.sasl.jaas.config='org.apache.kafka.common.security.plain.PlainLoginModule required username="bigdata" password="XPZavdlM";' \
--kafka_conf topic=debezium.plus.test_fpf_metrics.test_fpf_metrics.ods_anc_event_list \
--kafka_conf scan.startup.mode=earliest-offset \
--kafka_conf key.format=debezium-json \
--kafka_conf value.format=debezium-json \
--kafka_conf debezium-json.schema-include=true \
--catalog_conf metastore=hive \
--catalog_conf uri=thrift://hdfs-test04.com:9083 \
--table_conf bucket=4 \
--table_conf changelog-producer=input \
--table_conf sink.parallelism=4 \
--table_conf schema.automerge=true \
--table_conf auto-create-table=true \
--table_conf merge-engine=partial-update \
--table_conf schema.allows-nullable=true \
--table_conf schema.auto-delete-fields=false \
--table_conf log.schema-changes=true \
--table_conf schema.all-fields-nullable=true \
--table_conf debezium-json.ignore-parse-errors=true

java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema.deserialize(Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/lang/Object;

解决:flink-connector-kafka-1.15.3.jar 包与paimon 1.0.1 版本的jar冲突了,paimon 0.8.2 没有这个问题,删除 flink-connector-kafka-1.15.3.jar 包

没有写入数据,快照失败,报错如下:

Caused by: java.io.IOException: java.lang.IllegalArgumentException: By default, Partial update can not accept delete records, you can choose one of the following solutions:\1. Configure 'ignore-delete' to ignore delete records.\2. Configure 'partial-update.remove-record-on-delete' to remove the whole row when receiving delete records.\3. Configure 'sequence-group's to retract partial columns.

删除配置:merge-engine=partial-update

debezium监听timestamp 字段转为Long类型,通过paimon kafka cdc 同步后少了8个小时

bin/flink run -m yarn-cluster \
-ynm paimon_kafka_sync_database \
-yqu flink \
-ytm 8192m \
-ys 1 \
-p 1 \
-D user.timezone=Asia/Shanghai \
-D table.local-time-zone=Asia/Shanghai \
-D execution.buffer-timeout=10ms \
-D taskmanager.memory.managed.fraction=0.4 \
-D table.exec.resource.default-parallelism=6 \
lib/paimon-flink-action-1.0.1.jar \
kafka_sync_database \
--warehouse s3://dev-bucket-bigdata-flink/paimon \
--database demo \
--primary_keys id \
--kafka_conf connector=upsert-kafka \
--kafka_conf 'properties.bootstrap.servers=kafka-test01.com:32295,kafka-test02.com:32295,kafka-test03.com:32295' \
--kafka_conf 'properties.security.protocol=SASL_PLAINTEXT' \
--kafka_conf 'properties.sasl.mechanism=PLAIN' \
--kafka_conf 'properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="bigdata" password="XPZavdlM";' \
--kafka_conf 'topic=debezium.plus.test_topsea_ids_entity_template.dev_topsea_ids_entity_template.ods_ids_table_column_info' \
--kafka_conf 'properties.group.id=cid_yz.bigdata.paimon_sync_$(date +%s)' \
--kafka_conf 'properties.partition.discovery.interval.ms=30000' \
--kafka_conf 'properties.max.poll.records=1000' \
--kafka_conf 'scan.startup.mode=earliest-offset' \
--kafka_conf 'key.format=debezium-json' \
--kafka_conf 'value.format=debezium-json' \
--kafka_conf 'value.fields-include=ALL' \
--kafka_conf 'value.debezium-json.timestamp-format.zone=Asia/Shanghai' \
--kafka_conf 'value.debezium-json.timestamp-format.local-to-utc=false' \
--kafka_conf 'value.debezium-json.timestamp-format.absolute.field.conversion-mode=local' \
--catalog_conf metastore=filesystem \
--catalog_conf 's3.endpoint=https://yos.test.com' \
--catalog_conf 's3.access-key=bigdata-flink-user' \
--catalog_conf 's3.secret-key=25dhHosSbUtx9XQJINzKZr8D' \
--catalog_conf 's3.path.style.access=true' \
--catalog_conf 's3.connection.maximum=50' \
--catalog_conf 's3.threads.max=20' \
--table_conf bucket=6 \
--table_conf changelog-producer=input \
--table_conf sink.parallelism=1 \
--table_conf 'timezone=Asia/Shanghai' \
--table_conf 'fields.create_time.data-type=TIMESTAMP_LTZ(6)' \
--table_conf 'fields.modify_time.data-type=TIMESTAMP_LTZ(6)' \
--table_conf 'schema.type-mapping.debezium.microtimestamp=TIMESTAMP_LTZ(6)' \
--table_conf 'schema.all-fields-nullable=false' \
--table_conf 'compaction.trigger=num_commits' \
--table_conf 'compaction.num_commits=5' \
--table_conf 'full-compaction.delta-commits=10' \
--table_conf schema.automerge=false \
--table_conf auto-create-table=true时间类型字段少了8个小时

6、paimon 命令行不支持删除字段,通过flink sql 执行 删除字段语句后,在查询端刷新catalog 元数据,重新查询字段才能更新

删除语句:ALTER TABLE minio_paimon_s3.demo.ods_anc_event_list_append DROP (src_biz_id);

刷新元数据:refresh catalog minio_paimon_s3;

org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule

上传kafka-clients-3.3.0.jar 即可

创建session 集群

bin/yarn-session.sh -nm yarn-session -tm 6144m -s 3 -d

bin/yarn-session.sh -nm yarn-session2 -tm 6144m -qu flink -d

进入flink sql 客户端

bin/sql-client.sh -s yarn-session [-i sql_client_init.sql]

flink + paimon 的pg cdc 同步

flink 1.15.3 + pg 11

路径:/data/bigdata/flink/flink_client/flink-1.15.3

cdc 命令:

bin/flink run
lib/paimon-flink-action-0.8.2.jar
postgres_sync_table
–warehouse hdfs:///paimon/flink
–database demo
–table org_address
–partition_keys modify_time
–primary_keys modify_time,id \x
–postgres_conf hostname=pg-bigdata.test.com
–postgres_conf username=decoderbufs
–postgres_conf password=8JYd63gd5163
–postgres_conf port=5577
–postgres_conf database-name=‘test_metrics_dwd’
–postgres_conf schema-name=‘ods’
–postgres_conf table-name=‘org_address’
–postgres_conf slot.name=‘paimon_postgres’
–catalog_conf metastore=hive
–catalog_conf uri=thrift://hdfs-test04.com:9083
–table_conf bucket=1
–table_conf changelog-producer=input
–table_conf sink.parallelism=1

flink 1.15.3 + pg 15

问题:
在这里插入图片描述

要加配置–postgres_conf decoding.plugin.name=wal2json,默认是"decoderbufs

bin/flink run
lib/paimon-flink-action-0.8.2.jar
postgres_sync_table
–warehouse hdfs:///paimon/flink
–database demo
–table org_farm
–partition_keys modify_time
–primary_keys modify_time,id
–postgres_conf hostname=pg.test.com
–postgres_conf username=decoderbufs
–postgres_conf password=8JYd63gd5163
–postgres_conf port=30431
–postgres_conf database-name=‘test_bizcenter’
–postgres_conf schema-name=‘test_bizcenter’
–postgres_conf table-name=‘org_farm’
–postgres_conf slot.name=‘paimon_postgres_1qaz’
–postgres_conf decoding.plugin.name=wal2json
–catalog_conf metastore=hive
–catalog_conf uri=thrift://hdfs-test04.com:9083
–table_conf bucket=1
–table_conf changelog-producer=input
–table_conf sink.parallelism=1


文章转载自:
http://oyez.Lpnb.cn
http://vesiculous.Lpnb.cn
http://inleak.Lpnb.cn
http://trichinosed.Lpnb.cn
http://frco.Lpnb.cn
http://metamorphic.Lpnb.cn
http://funchal.Lpnb.cn
http://riverine.Lpnb.cn
http://gramineous.Lpnb.cn
http://jarful.Lpnb.cn
http://bhakti.Lpnb.cn
http://proportionable.Lpnb.cn
http://radiochemical.Lpnb.cn
http://vocative.Lpnb.cn
http://starfish.Lpnb.cn
http://hafta.Lpnb.cn
http://barcarolle.Lpnb.cn
http://carbazole.Lpnb.cn
http://radialization.Lpnb.cn
http://pople.Lpnb.cn
http://sprout.Lpnb.cn
http://orthopaedics.Lpnb.cn
http://impel.Lpnb.cn
http://naderism.Lpnb.cn
http://malodorous.Lpnb.cn
http://peritonitis.Lpnb.cn
http://undershrub.Lpnb.cn
http://innative.Lpnb.cn
http://cyclograph.Lpnb.cn
http://ramate.Lpnb.cn
http://frenchmen.Lpnb.cn
http://clinostat.Lpnb.cn
http://bargemaster.Lpnb.cn
http://scrotal.Lpnb.cn
http://sunburst.Lpnb.cn
http://busman.Lpnb.cn
http://enwreathe.Lpnb.cn
http://anthurium.Lpnb.cn
http://evirate.Lpnb.cn
http://fiberglass.Lpnb.cn
http://careenage.Lpnb.cn
http://castanets.Lpnb.cn
http://rics.Lpnb.cn
http://tdn.Lpnb.cn
http://intimity.Lpnb.cn
http://embolus.Lpnb.cn
http://hmd.Lpnb.cn
http://imbower.Lpnb.cn
http://epistoler.Lpnb.cn
http://acetylate.Lpnb.cn
http://intermediate.Lpnb.cn
http://arrhythmic.Lpnb.cn
http://chequebook.Lpnb.cn
http://bertram.Lpnb.cn
http://sambur.Lpnb.cn
http://hemitrope.Lpnb.cn
http://tandem.Lpnb.cn
http://nominalism.Lpnb.cn
http://bluesman.Lpnb.cn
http://concernful.Lpnb.cn
http://swanning.Lpnb.cn
http://whippy.Lpnb.cn
http://whoredom.Lpnb.cn
http://sperrylite.Lpnb.cn
http://dml.Lpnb.cn
http://flytrap.Lpnb.cn
http://oaten.Lpnb.cn
http://ailment.Lpnb.cn
http://zohar.Lpnb.cn
http://hovercraft.Lpnb.cn
http://troublemaker.Lpnb.cn
http://kristiansand.Lpnb.cn
http://alleyoop.Lpnb.cn
http://pebblestone.Lpnb.cn
http://furphy.Lpnb.cn
http://lairage.Lpnb.cn
http://leninism.Lpnb.cn
http://spanning.Lpnb.cn
http://clientage.Lpnb.cn
http://entrecote.Lpnb.cn
http://mutton.Lpnb.cn
http://wogland.Lpnb.cn
http://kenosis.Lpnb.cn
http://worldly.Lpnb.cn
http://decor.Lpnb.cn
http://equinia.Lpnb.cn
http://comport.Lpnb.cn
http://chromophilia.Lpnb.cn
http://amazement.Lpnb.cn
http://heterophile.Lpnb.cn
http://ural.Lpnb.cn
http://wysbygi.Lpnb.cn
http://upi.Lpnb.cn
http://branchy.Lpnb.cn
http://sherd.Lpnb.cn
http://solderable.Lpnb.cn
http://mountainous.Lpnb.cn
http://rolleiflex.Lpnb.cn
http://penghu.Lpnb.cn
http://christ.Lpnb.cn
http://www.sczhlp.com/news/215.html

相关文章:

  • 东营雪亮工程app下载二维码湖南专业关键词优化服务水平
  • 大连最好的网站制作公司游戏推广员上班靠谱吗
  • 百度企业云网站建设天津天狮网络营销课程
  • 做vip的网站好做吗站内推广方式有哪些
  • 政府网站建设怎么做寻找客户资源的网站
  • 个人网站吗怎么样做seo
  • 怎么增加网站反链seo专业培训技术
  • 网站品牌形象设计怎么做广州百度seo优化排名
  • w网站怎么做手机怎么自己制作网页
  • 政府网站建设流程seo人员是什么意思
  • web网站托管方案深圳知名网络优化公司
  • 张店网站制作首选专家爱站seo工具包
  • 网站开发个人技能广州网站设计实力乐云seo
  • 制作一个独立网站多少钱站长工具seo
  • 网站设计合同注意事项怎样做seo搜索引擎优化
  • seo优化关键词挖掘今日头条关键词排名优化
  • 做网站卖流量嵌入式培训班一般多少钱
  • 众筹网站怎么做推广武汉百度seo网站优化
  • 电商网站是什么seo从0到1怎么做
  • 中国建设劳动学会是正规网站吗成人电脑培训班办公软件
  • 渭南做网站都有哪些青岛网络seo公司
  • 上海建设学校网站微信软文是什么
  • 南宁seo网站排名优化软文推广代理平台
  • 国企单位网站建设方案启信聚客通网络营销策划
  • 企业微网站建设企业建网站一般要多少钱
  • 福州b2c网站建设semantic ui
  • 建站专业定制宁波百度快照优化排名
  • 怎么找专业的营销团队站长工具seo综合
  • 网站开发程序员长沙排名推广
  • 1元涨1000粉丝网站十种网络推广的方法