本文为您介绍如何使用数据传输同步 OceanBase 数据库的数据至 RocketMQ。
背景信息
消息队列 RocketMQ 是阿里云基于 Apache RocketMQ 构建的低延迟、高并发、高可靠的分布式消息中间件。数据传输的数据同步功能可以帮助您实现 OceanBase 数据库的物理表和 RocketMQ 数据源之间的数据实时同步,扩展消息处理能力。
同步 OceanBase 数据库的数据至 RocketMQ 时,两种租户对应的数据格式请参见 数据库传输到文本协议的格式说明。
前提条件
数据传输已具备云资源访问权限。详情请参见 数据传输迁移角色授权。
已为源端 OceanBase 数据库创建专用于数据同步任务的数据库用户,并为其赋予了相关权限。详情请参见 创建数据库用户。
使用限制
数据传输支持的 RocketMQ 实例版本为 V4.x 和 V5.x,包含商业版和社区版。
数据同步的对象仅支持物理表,不支持其他对象。
数据同步过程中,数据传输支持删除表之后再新建表,即支持对已经同步的表进行
DROP TABLE
操作后,再执行CREATE TABLE
。数据传输不支持通过重命名的方式新建表,即不支持执行RENAME TABLE a TO a_tmp
操作。待同步的表名和其中的列名不能包含中文字符。
数据传输仅支持迁移库名、表名和列名为 ASCII 码且不包含特殊字符(包括换行、空格,以及 .|"'`()=;/&\)的对象。
数据传输不支持 OceanBase 备库作为源端。
注意事项
当 OceanBase 数据库 V4.x 进行增量同步时,如果生成列没有标记 STORED 属性,则同步目标端时该列将同步为 NULL 值,导致下游接收到的该列数据时不符合预期。
当更新的行包括 LOB 列时:
如果 LOB 列为更新列,请勿依赖 LOB 列在
UPDATE
或DELETE
操作前的值。目前使用 LOB 列进行存储的数据类型包括 JSON、GIS、XML、UDT(用户定义类型),以及 LONGTEXT、MEDIUMTEXT 等各类 TEXT。
如果 LOB 列为非更新列,LOB 列在
UPDATE
或DELETE
操作前或操作后的值均为 NULL。
当任务意外中断进行断点续传时,RocketMQ 实例中可能会存在部分重复数据(最近一分钟内),因此下游系统需要具备排重能力。
节点之间的时钟不同步,或者电脑终端和服务器之间的时钟不同步,均可能导致增量同步的延迟时间不准确。
例如,如果时钟早于标准时间,可能导致延迟时间为负数。如果时钟晚于标准时间,可能导致延迟。
如果在创建数据同步任务时,您仅配置了 增量同步,数据传输要求源端数据库的本地增量日志保存 48 小时以上。
如果在创建数据同步任务时,您配置了 全量同步+增量同步,数据传输要求源端数据库的本地增量日志至少保留 7 天以上。否则数据传输可能因无法获取增量日志而导致数据同步任务失败,甚至导致源端和目标端数据不一致。
支持的源端和目标端实例类型
下表中,OceanBase 数据库 MySQL 租户简称为 OB_MySQL,OceanBase 数据库 Oracle 租户简称为 OB_Oracle。
源端 | 目标端 |
OB_MySQL(OceanBase 集群实例) | RocketMQ(阿里云 RocketMQ 实例) |
OB_MySQL(OceanBase 集群实例) | RocketMQ(VPC 内自建 RocketMQ 实例) |
OB_MySQL(OceanBase 集群实例) | RocketMQ(公网 RocketMQ 实例) |
OB_MySQL(Serverless 实例) | RocketMQ(阿里云 RocketMQ 实例) |
OB_MySQL(Serverless 实例) | RocketMQ(VPC 内自建 RocketMQ 实例) |
OB_MySQL(Serverless 实例) | RocketMQ(公网 RocketMQ 实例) |
OB_Oracle(OceanBase 集群实例) | RocketMQ(阿里云 RocketMQ 实例) |
OB_Oracle(OceanBase 集群实例) | RocketMQ(VPC 内自建 RocketMQ 实例) |
OB_Oracle(OceanBase 集群实例) | RocketMQ(公网 RocketMQ 实例) |
操作步骤
登录 OceanBase 管理控制台,购买数据同步任务。
详情请参见 购买数据同步任务。
在 数据传输 > 数据同步 页面,单击新购买的数据同步任务后的 配置。
如果您需要引用已有的任务配置信息,可以单击 引用配置。详情请参见 引用和清空数据同步任务配置。
在 选择源和目标 页面,配置各项参数。
参数
描述
同步任务名称
建议使用中文、数字和字母的组合。名称中不能包含空格,长度不能超过 64 个字符。
标签
单击文本框,在下拉列表中选择目标标签。您也可以单击 管理标签,进行新建、修改和删除。详情请参见 通过标签管理数据同步任务。
源端
如果您已新建 OceanBase 数据源,请从下拉列表中进行选择。如果未新建,请单击下拉列表中的 新建数据源,在右侧对话框进行新建。参数详情请参见 新建 OceanBase 数据源。
重要源端不支持 OceanBase 数据库的 实例类型 为 OceanBase 租户实例。
目标端
如果您已新建 RocketMQ 数据源,请从下拉列表中进行选择。如果未新建,请单击下拉列表中的 新建数据源,在右侧对话框进行新建。参数详情请参见 新建 RocketMQ 数据源。
单击 下一步。在 选择同步类型 页面,选择当前数据同步任务的同步类型。
同步类型包括 全量同步 和 增量同步。增量同步 仅支持 DML 同步(包括
Insert
、Delete
和Update
),您可以根据需求进行自定义配置。详情请参见 自定义配置 DDL/DML。单击 下一步。在 选择同步对象 页面,选择当前数据同步任务需要同步的对象。
您可以通过 指定对象 和 匹配规则 两个入口选择同步对象。本文为您介绍通过 指定对象 方式选择同步对象的具体操作,配置匹配规则的详情请参见 配置匹配规则 中库到消息队列的通配规则说明和配置方式。
说明如果您在 选择同步类型 步骤已勾选 DDL 同步,建议通过匹配规则方式选择同步对象,以确保所有符合同步对象规则的新增对象都将被同步。如果您通过指定对象方式选择同步对象,则新增对象或重命名后的对象将不会被同步。
同步 OceanBase 数据库的数据至 RocketMQ 时,支持多表到多 Topic 的同步。
在 选择同步对象 区域,选中 指定对象。
在选择区域左侧选中需要同步的对象。
单击 >。
在 将对象映射至 Topic 对话框的 已有 Topic 下拉列表中,搜索并选中需要同步的 Topic。
您也可以输入已有 Topic 后选中显示的 Topic 名称。
单击 确定。
数据传输支持通过文本导入对象,并支持对目标端对象进行更改 Topic、设置行过滤、移除单个对象或全部对象等操作。目标端对象的结构为 Topic>Database>Table。
说明通过 匹配规则 方式选择同步对象时,重命名能力由匹配规则语法覆盖,操作处仅支持设置过滤条件,以及选择分片列和需要同步的列。详情请参见 配置匹配规则。
操作
步骤
导入对象
在选择区域的右侧列表中,单击右上角的 导入对象。
在对话框中,单击 确定。
重要导入会覆盖之前的操作选择,请谨慎操作。
在 导入同步对象 对话框中,导入需要同步的对象。 您可以通过导入 CSV 文件的方式进行设置行过滤条件、设置过滤列和设置分片列等操作。详情请参见 下载和导入同步对象配置。
单击 检验合法性。
通过合法性的检验后,单击 确定。
更改 Topic
数据传输支持对目标对象进行更改 Topic 操作。详情请参见 更改 Topic。
设置
数据传输支持
WHERE
条件实现行过滤,并选择分片列和需要同步的列。在 设置 对话框中,您可以进行以下操作。
在 行过滤条件 区域的文本框中,输入标准的 SQL 语句中的
WHERE
子句,来配置行过滤。详情请参见 SQL 条件过滤数据。在 分片列 下拉列表中,选择目标分片列。您可以选择多个字段作为分片列,该参数为可选参数。
选择分片列时,如果没有特殊情况,默认选择主键即可。如果存在主键负载不均衡的情况,请选择唯一性标识且负载相对均衡的字段作为分片列,避免潜在的性能问题。分片列的主要作用如下:
负载均衡:在目标端可以进行并发写入的情况下,通过分片列区分发送消息需要使用的特定线程。
有序性:由于存在并发写入可能导致的无序问题,数据传输确保在分片列的值相同的情况下,用户接收到的消息是有序的。此处的有序是指变更顺序(DML 对于一列的执行顺序)。
在 选择列 区域,选择需要同步的列。详情请参见 列过滤。
移除/全部移除
数据传输支持在数据映射时,对暂时选中到目标端的单个或多个对象进行移除操作。
移除单个同步对象
在选择区域的右侧列表中,鼠标悬停至目标对象,单击显示的 移除,即可移除该同步对象。
移除全部同步对象
在选择区域的右侧列表中,单击右上角的 全部移除。在对话框中,单击 确定,即可移除全部同步对象。
单击 下一步。在 同步选项 页面,配置各项参数。
全量同步
在 选择同步类型 页面,选中 全量同步,才会显示下述参数。
参数
描述
读取并发配置
该参数用于配置全量同步阶段从源端读取数据的并发数,最大限制为 512.并发数过高可能会造成源端压力过大,影响业务。
写入并发配置
该参数用于配置全量同步阶段往目标端写入数据的并发数,最大限制为 512。并发数过高可能会造成目标端压力过大,影响业务。
全量同步速率限制
您可以根据实际需求决定是否开启全量同步速率限制。如果开启,请设置 RPS(全量同步阶段每秒最多可以同步至目标端的数据行数的最大值限制)和 BPS(全量同步阶段每秒最多可以同步至目标端的数据量的最大值限制)。
说明此处设置的 RPS 和 BPS 仅作为限速限流能力,全量同步实际可以达到的性能受限于源端、目标端、实例规格配置等因素的影响。
增量同步
在 选择同步类型 页面,选中 增量同步,才会显示下述参数。
参数
描述
写入并发配置
该参数用于配置增量同步阶段往目标端写入数据的并发数,最大限制为 512。并发数过高可能会造成目标端压力过大,影响业务。
增量同步速率限制
您可以根据实际需求决定是否开启增量同步速率限制。如果开启,请设置 RPS(增量同步阶段每秒最多可以同步至目标端的数据行数的最大值限制)和 BPS(增量同步阶段每秒最多可以同步至目标端的数据量的最大值限制)。
说明此处设置的 RPS 和 BPS 仅作为限速限流能力,增量同步实际可以达到的性能受限于源端、目标端、实例规格配置等因素的影响。
增量同步起始位点
如果选择同步类型时已选择 全量同步,该参数不会显示。
如果选择同步类型时未选择 全量同步,但选择了 增量同步,请在此处指定同步某个时间节点之后的数据,默认为当前系统时间。详情请参见 设置增量同步位点。
高级选项
参数
描述
序列化方式
控制数据同步至 RocketMQ 的消息格式,目前支持 Default、Canal、DataWorks(支持 2.0 版本)、SharePlex、DefaultExtendColumnType、Debezium、DebeziumFlatten 和 DebeziumSmt。详情请参见 数据格式说明。
重要目前仅 OceanBase 数据库 MySQL 租户支持 Debezium、DebeziumFlatten 和 DebeziumSmt。
分区规则
同步源端数据至 RocketMQ 的规则,目前仅支持 Hash。Hash 表示数据传输使用一定的 Hash 算法,根据主键值或分片列值 Hash 选择 RocketMQ 的队列(MessageQueue)。
业务系统标识(可选)
仅选择 序列化方式 为 DataWorks 时,会显示该参数,用于标识数据的业务系统来源,以便您后续进行自定义处理。该业务系统标识的长度限制为 1~20 个字符。
目标端
参数
描述
请输入生产者群组
生产者群组用于标识一组生产者,能够向多个 Topic 中写入数据。
是否允许消息追踪
如果允许消息追踪,则可以追踪到一条消息从生产者发送到消息队列 RocketMQ 版服务端,再到消费者消费处理,整个过程中的各个相关节点的时间、状态等数据汇聚而成的完整任务信息。该消息轨迹可以作为排查生产环境中的问题强有力的数据支持。
单击 预检查。
在 预检查 环节,数据传输会检测和目标端的连接情况。如果预检查报错:
您可以在排查并处理问题后,重新执行预检查,直至预检查成功。
您也可以单击错误预检查项操作列中的 跳过,会弹出对话框提示您跳过本操作的具体影响,确认可以跳过后,请单击对话框中的 确定。
预检查成功后,单击 启动任务。
如果您暂时无需启动任务,请单击 保存。后续您只能在 同步任务列表 页面手动启动任务或通过批量操作启动任务。批量操作的详情请参见 批量操作数据同步任务。
数据传输支持在数据同步任务运行过程中修改同步对象,详情请参见 查看和修改同步对象及其过滤条件。数据同步任务启动后,会根据选择的同步类型依次执行,详情请参见 查看同步详情。
如果数据同步任务运行报错(通常由于网络不通或进程启动过慢导致),您可以在数据同步任务的列表或详情页面,单击 恢复。