本文为您介绍序列化方式和数据库传输到文本协议的数据格式。
序列化方式的格式说明
您在使用数据传输同步源端数据至 Kafka、DataHub(BLOB 类型)和 RocketMQ 时,支持序列化方式控制数据同步至目标端的消息格式。序列化方式包括 Default、Canal、 DataWorks(支持 2.0 版本)、SharePlex、DefaultExtendColumnType、Debezium、DebeziumFlatten、DebeziumSmt 和 Avro。
目前仅 OceanBase 数据库 MySQL 租户支持序列化方式 Debezium、DebeziumFlatten 和 DebeziumSmt。
目前仅同步 OceanBase 数据库 MySQL 租户的数据至 Kafka 时,支持序列化方式 Avro。
Default JSON 消息格式
数据同步至 Kafka、DataHub(BLOB 类型)和 RocketMQ 时,序列化方式 Default 使用如下 JSON 消息格式。
{
"prevStruct": { // 变更前镜像
"col1": "val1" // 键值对,包含全量键值
},
"postStruct": { // 变更后镜像
"col1": "val1" // 键值对,包含全量键值
},
"allMetaData"{
"checkpoint": "STRING", // 当前同步位点,增量阶段表示同步到的时间位点(秒级时间戳),全量阶段使用主键键值对表示
"record_primary_key": "STRING", // 主键列的名称。如果存在多列使用 \u0001 分割
"record_primary_value": "STRING", // 主键值。如果存在多列使用 \u0001 分割
"source_identity": "STRING", // 源端标识,如果是增量则是 subtopic,如果是全量则没有意义的序号
"dbType": "STRING", // 数据库的类型。包括 MYSQL/ORACLE/OCEANBASE(老模式,兼容使用)/OB_IN_ORACLE_MODE(老模式,兼容使用)/DB2(老模式,兼容使用)/OB_MYSQL/OB_ORACLE/DB2_LUW
"storeDataSequence": "LONG", // 该字段只有在增量场景下 source.json 配置中包含 sequenceEnabled=true 才存在,默认是 true。用于排序,生成规则是一个同步进程中,时间戳 + 不超过五位序号递增。{时间戳}{递增序号}。
"table_name": "STRING", // 使用 SQL 语句进行变更的表的名称
"db": "STRING", // 使用 SQL 语句进行变更的数据库的名称。如果是 OceanBase 数据库,则包含租户,格式为 {tenant}.{database}
"timestamp": "STRING", // 数据变更秒级时间戳,仅增量存在
"uniqueId": "STRING", // 增量中表示 STORE 传递下来的事务序号标识,
"ddlType": "STRING", // DDL 具体类型
},
"recordType": "INSERT/UPDATE/DELETE/HEARTBEAT/DDL" // 变更类型
}
DDL 的 Record 中,仅存在 "ddl" 为列名的键,值为 DDL 语句。
前镜像和后镜像:
prevStruct
:表示增量数据的前镜像信息,即 SQL 执行前的数据。postStruct
:表示增量数据的后镜像信息,即 SQL 执行后的数据。
DELETE
仅存在prevStruct
,INSERT
和DDL
仅存在postStruct
,UPDATE
同时存在prevStruct
和postStruct
,HEARTBEAT
(定期心跳消息)不存在postStruct
和postStruct。
数据示例如下:
INSERT(插入)数据的示例
{ "allMetaData": { "checkpoint": null, "record_primary_key": "id1\u0001id2", "source_identity": null, "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table_name", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct": null, "recordType": "INSERT", "postStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.99999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345" } }
UPDATE(更新)数据的示例
{ "allMetaData": { "checkpoint": null, "record_primary_key": "id1\u0001id2", "source_identity": null, "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table_name", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999999999, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", }, "recordType": "UPDATE", "postStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999999999, "col8": "hello world 2020", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345" } }
DELETE(删除)数据的示例
{ "allMetaData": { "checkpoint": null, "record_primary_key": "id1\u0001id2", "source_identity": null, "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table_name", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col16": 1.2222, "col7": 9.99999999, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345" }, "recordType": "DELETE", "postStruct": null }
DDL 示例
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';
{ "prevStruct": null, "postStruct": { "ddl": "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'" }, "allMetaData": { "checkpoint": "1671177057", "dbType": "OB_MYSQL", "storeDataSequence": null, "db": "connector_test", "timestamp": "1671177057", "uniqueId": null, "ddlType": "ALTER_TABLE", "record_primary_key": null, "source_identity": null, "record_primary_value": null, "table_name": "all_mysql_type_test" }, "recordType": "DDL" }
Canal JSON 消息格式
数据同步至 Kafka、DataHub(BLOB 类型)和 RocketMQ 时,序列化方式 Canal 使用如下 JSON 消息格式。
{
"database": "STRING", // 使用 SQL 语句进行变更的数据库的名称。如果是 OceanBase 数据库,仅存在数据库名称,无需租户名称。
"sqlType": {
"col1": "INTEGER" // 变更列类型,数字参考 java.sql.Types
},
"data": [ // 变更后数据键值对,目前只会存在一条消息
{
"col1": "val1"
}
],
"pkNames": [ // 主键列名
"col1"
],
"old": [ // 仅 UPDATE 消息存在。表示 UPDATE 语句变更的列,即变更前的列值
{
"col1": "val1"
}
],
"mysqlType": { // 列类型描述
"col": "STRING"
},
"type": "STRING", // 变更类型
"table": "STRING", // 使用 SQL 语句进行变更的表的名称
"es": "LONG", // 变更时间,毫秒级时间戳
"isDdl": "BOOLEAN", // 是否是 DDL
"ts": "LONG", // 写入目的端时间戳
"sql": "STRING" // 当前是空
}
数据示例如下:
INSERT
(插入)数据的示例{ "database": "database", "sqlType": { "col1": 93, "col2": 12, "col3": 6, "col4": 8, "col5": 5, "col6": 92, "col7": 4, "col8": -5, "col9": 2004, "col10": -6, "col11": 91, "col12": 3, "col13": -5, "col14": 93 }, "data": [ { "col1": "2020-11-25 00:01:02", "col2": "hello world", "col3": 1.2222, "col4": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col5": 129, "col6": "00:01:02", "col7": 2147483646, "col8": 9223372036854775806, "col9": "aGVsbG8gd29ybGQ=", "col10": 3, "col11": "2020-11-25", "col12": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col13": 10223372036854775806, "col14": "1606233662.012345" } ], "pkNames": [ "col1", "col2" ], "old": null, "mysqlType": { "col1": "datetime", "col2": "varchar", "col3": "float", "col4": "double", "col5": "smallint", "col6": "time", "col7": "int", "col8": "int64", "col9": "blob", "col10": "tinyint", "col11": "date", "col12": "decimal", "col13": "bigint", "col14": "timestamp" }, "type": "INSERT", "table": "table", "es": 1609344671000, "isDdl": false, "ts": 1618323429026, "sql": "" }
UPDATE
(更新)数据的示例{ "database": "database", "sqlType": { "col1": 93, "col2": 12, "col3": 6, "col4": 8, "col5": 5, "col6": 92, "col7": 4, "col8": -5, "col9": 2004, "col10": -6, "col11": 91, "col12": 3, "col13": -5, "col14": 93 }, "data": [ { "col1": "2020-11-25 00:01:02", "col2": "hello world 2020", "col3": 1.2222, "col4": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col5": 129, "col6": "00:01:02", "col7": 2147483646, "col8": 9223372036854775806, "col9": "aGVsbG8gd29ybGQ=", "col10": 3, "col11": "2020-11-25", "col12": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col13": 10223372036854775806, "col14": "1606233662.012345" } ], "pkNames": [ "col1", "col2" ], "old": [ { "string": "hello world" } ], "mysqlType": { "col1": "datetime", "col2": "varchar", "col3": "float", "col4": "double", "col5": "smallint", "col6": "time", "col7": "int", "col8": "int64", "col9": "blob", "col10": "tinyint", "col11": "date", "col12": "decimal", "col13": "bigint", "col14": "timestamp" }, "type": "UPDATE", "table": "table", "es": 1609344671000, "isDdl": false, "ts": 1618364572908, "sql": "" }
DELETE
(删除)数据的示例{ "database": "database", "sqlType": { "col1": 93, "col2": 12, "col3": 6, "col4": 8, "col5": 5, "col6": 92, "col7": 4, "col8": -5, "col9": 2004, "col10": -6, "col11": 91, "col12": 3, "col13": -5, "col14": 93 }, "data": [ { "col1": "2020-11-25 00:01:02", "col2": "hello world", "col3": 1.2222, "col4": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col5": 129, "col6": "00:01:02", "col7": 2147483646, "col8": 9223372036854775806, "col9": "aGVsbG8gd29ybGQ=", "col10": 3, "col11": "2020-11-25", "col12": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col13": 10223372036854775806, "col14": "1606233662.012345" } ], "pkNames": [ "int8", "int16" ], "old": null, "mysqlType": { "col1": "datetime", "col2": "varchar", "col3": "float", "col4": "double", "col5": "smallint", "col6": "time", "col7": "int", "col8": "int64", "col9": "blob", "col10": "tinyint", "col11": "date", "col12": "decimal", "col13": "bigint", "col14": "timestamp" }, "type": "DELETE", "table": "table", "es": 1609344671000, "isDdl": false, "ts": 1618364660278, "sql": "" }
DDL 示例
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';
{ "database": "connector_test", "sqlType": null, "data": null, "pkNames": null, "old": null, "mysqlType": null, "type": "ALTER", "table": "all_mysql_type_test", "es": 1671177209000, "isDdl": true, "ts": 1671177291475, "sql": "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'" }
DataWorks JSON 消息格式
数据同步至 Kafka、DataHub(BLOB 类型)和 RocketMQ 时,序列化方式 DataWorks 使用如下 JSON 消息格式。
{
"version": "2.0", //协议版本,目前仅支持 DataWorks 2.0 版本
"schema": { //变更的元数据信息,仅指定列名与列类型信息
"source": { //变更来源信息
"dbType": "mysql", //数据源类型
"dbVersion": "5.7.35", //数据库版本
"dbName": "myDatabase", //数据库名称
"schema": "mySchema", //Schema 名称,存在 Schema 的系统必填
"table": "tableName" //表名
},
"column": [ //变更的数据列信息,更新目标表记录内容
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "varchar(20)"
},
{
"name": "mydata",
"type": "binary"
},
{
"name": "ts",
"type": "datetime"
}
],
"pk": [ //有主键或唯⼀键必填,否则可以不填
"pkName1",
"pkName2"
]
},
"payload": {
"before": {
"data": {
"id": 111,
"name": "scooter",
"mydata": "[base64 string]", //如果是二进制类型,需要进行 Base64 编码
"ts": 1590315269000.123456789 //时间戳,其整数部分 13 位,小数部分 9 位
}
},
"after": {
"data": {
"id": 222,
"name": "donald",
"mydata": "[base64 string]",
"ts": 1590315269000
}
},
"op": "INSERT/UPDATE/DELETE/HEARTBEAT/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/...", //大小写敏感
"timestamp": {
"eventTime": 1620457659000 // 变更在源端库发生时间,毫秒精度的 13 位时间戳
},
"ddl": {
"text": "ADD COLUMN ..."
},
"scn": "⾃增 ID"
},
"extend": { //extend 扩展字段,用于后续扩展需求。如果没有,可以不填
"load_fm": "CIBS" //记录来源系统。例如,"CIBS"
}
}
同步任务心跳消息:
{
"version": "2.0", //协议版本
"payload": {
"timestamp": {
"eventTime": 1620457659000 //⼼跳包时间
},
"op": "HEARTBEAT" //标识是⼼跳包
}
}
数据示例如下:
INSERT
(插入)数据的示例{ "version": "2.0", "schema": { "source": { "dbType": "ob_mysql", "dbVersion": null, "dbName": "db", "schema": null, "table": "tab" }, "column": [ { "name": "int8", "type": "TINYINT" }, { "name": "int16", "type": "SMALLINT" }, { "name": "int32", "type": "INT" }, { "name": "int64", "type": "INT64" }, { "name": "float32", "type": "FLOAT" }, { "name": "float64", "type": "DOUBLE" }, { "name": "bigInt", "type": "BIGINT" }, { "name": "boolean", "type": "BOOLEAN" }, { "name": "string", "type": "VARCHAR" }, { "name": "bytes", "type": "BLOB" }, { "name": "decimal", "type": "DECIMAL" }, { "name": "localDate", "type": "DATE" }, { "name": "localTime", "type": "TIME" }, { "name": "localDateTime", "type": "DATETIME" }, { "name": "timestamp", "type": "TIMESTAMP" }, { "name": "zonedDateTime", "type": "ZONED_DATETIME" }, { "name": "intervalDayToSecond", "type": "INTERVAL_DAY_TO_SECOND" }, { "name": "intervalYearToMonth", "type": "INTERVAL_YEAR_TO_MONTH" } ], "pk": [ "pkName1", "pkName12" ] }, "payload": { "before": null, "after": { "data": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 1.2222, "col6": 0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col7": 10223372036854775806, "col8": 1, "col9": "hello world", "col10": "aGVsbG8gd29ybGQ=", "col11": 0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col12": "2020-11-25", "col13": "00:01:02", "col14": "2020-11-25 00:01:02", "col15": "1606233662.012345", "col16": "2020-11-25 00:01:02.012345 Asia/Shanghai", "col17": "INTERVAL '3' DAY", "col18": "INTERVAL '4' YEAR" } }, "op": "INSERT", "timestamp": { "eventTime": 1647581000000, "systemTime": 1647581000795, "checkpointTime": 1647581000 }, "ddl": null, "scn": "null" }, "extend": { "load_fm": "test" } }
UPDATE
(更新)数据的示例{ "version": "2.0", "schema": { "source": { "dbType": "ob_mysql", "dbVersion": null, "dbName": "db", "schema": null, "table": "tab" }, "column": [ { "name": "int8", "type": "TINYINT" }, { "name": "int16", "type": "SMALLINT" }, { "name": "int32", "type": "INT" }, { "name": "int64", "type": "INT64" }, { "name": "float32", "type": "FLOAT" }, { "name": "float64", "type": "DOUBLE" }, { "name": "bigInt", "type": "BIGINT" }, { "name": "boolean", "type": "BOOLEAN" }, { "name": "string", "type": "VARCHAR" }, { "name": "bytes", "type": "BLOB" }, { "name": "decimal", "type": "DECIMAL" }, { "name": "localDate", "type": "DATE" }, { "name": "localTime", "type": "TIME" }, { "name": "localDateTime", "type": "DATETIME" }, { "name": "timestamp", "type": "TIMESTAMP" }, { "name": "zonedDateTime", "type": "ZONED_DATETIME" }, { "name": "intervalDayToSecond", "type": "INTERVAL_DAY_TO_SECOND" }, { "name": "intervalYearToMonth", "type": "INTERVAL_YEAR_TO_MONTH" } ], "pk": [ "pkName1", "pkName2" ] }, "payload": { "before": { "data": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 1.2222, "col6": 0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col7": 10223372036854775806, "col8": 1, "col9": "hello world", "col10": "aGVsbG8gd29ybGQ=", "col11": 0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col12": "2020-11-25", "col13": "00:01:02", "col14": "2020-11-25 00:01:02", "col15": "1606233662.012345", "col16": "2020-11-25 00:01:02.012345 Asia/Shanghai", "col17": "INTERVAL '3' DAY", "col18": "INTERVAL '4' YEAR" } }, "after": { "data": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 1.2222, "col6": 0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col7": 10223372036854775806, "col8": 1, "col9": "hello world 2020", "col10": "aGVsbG8gd29ybGQ=", "col11": 0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col12": "2020-11-25", "col13": "00:01:02", "col14": "2020-11-25 00:01:02", "col15": "1606233662.012345", "col16": "2020-11-25 00:01:02.012345 Asia/Shanghai", "col17": "INTERVAL '3' DAY", "col18": "INTERVAL '4' YEAR" } }, "op": "UPDATE", "timestamp": { "eventTime": 1647581038000, "systemTime": 1647581038674, "checkpointTime": 1647581038 }, "ddl": null, "scn": "null" }, "extend": { "load_fm": "test" } }
DELETE
(删除)数据的示例{ "version": "2.0", "schema": { "source": { "dbType": "ob_mysql", "dbVersion": null, "dbName": "db", "schema": null, "table": "tab" }, "column": [ { "name": "int8", "type": "TINYINT" }, { "name": "int16", "type": "SMALLINT" }, { "name": "int32", "type": "INT" }, { "name": "int64", "type": "INT64" }, { "name": "float32", "type": "FLOAT" }, { "name": "float64", "type": "DOUBLE" }, { "name": "bigInt", "type": "BIGINT" }, { "name": "boolean", "type": "BOOLEAN" }, { "name": "string", "type": "VARCHAR" }, { "name": "bytes", "type": "BLOB" }, { "name": "decimal", "type": "DECIMAL" }, { "name": "localDate", "type": "DATE" }, { "name": "localTime", "type": "TIME" }, { "name": "localDateTime", "type": "DATETIME" }, { "name": "timestamp", "type": "TIMESTAMP" }, { "name": "zonedDateTime", "type": "ZONED_DATETIME" }, { "name": "intervalDayToSecond", "type": "INTERVAL_DAY_TO_SECOND" }, { "name": "intervalYearToMonth", "type": "INTERVAL_YEAR_TO_MONTH" } ], "pk": [ "pkName1", "pkName2" ] }, "payload": { "before": { "data": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 1.2222, "col6": 0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col7": 10223372036854775806, "col8": 1, "col9": "hello world", "col10": "aGVsbG8gd29ybGQ=", "col11": 0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col12": "2020-11-25", "col13": "00:01:02", "col14": "2020-11-25 00:01:02", "col15": "1606233662.012345", "col16": "2020-11-25 00:01:02.012345 Asia/Shanghai", "col17": "INTERVAL '3' DAY", "col18": "INTERVAL '4' YEAR" } }, "after": null, "op": "DELETE", "timestamp": { "eventTime": 1647581072000, "systemTime": 1647581072976, "checkpointTime": 1647581072 }, "ddl": null, "scn": "null" }, "extend": { "load_fm": "test" } }
DDL 示例
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';
{ "version": "2.0", "schema": { "source": { "dbType": "ob_mysql", "dbVersion": null, "dbName": "connector_test", "schema": null, "table": "all_mysql_type_test" }, "column": null, "pk": null }, "payload": { "before": null, "after": null, "op": "ALTER", "timestamp": { "eventTime": 1671177209000, "systemTime": 1671177291485, "checkpointTime": 1671177200 }, "ddl": { "text": "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'" }, "scn": "null" }, "extend": {} }
SharePlex JSON 消息格式
数据同步至 Kafka、DataHub(BLOB 类型)和 RocketMQ 时,序列化方式 SharePlex 使用如下 JSON 消息格式。
{
"data": { // 变更数据键值对,如果是 INSERT / DELETE 是全量值,如果是 UPDETE 只有变更值
"col1": "val1"
},
"meta": {
"time": "YYYY-MM-DDTHH:mm:ss", // 变更时间
"op": "", // 变更类型,包括 ins/upd/del/ddl
"posttime": "YYYY-MM-DDTHH:mm:ss", // 写入目标端的时间
"idx": "STRING", //消息在事务中的索引/索引的消息数量。该参数已废弃。
"size": NUMBER, //事务内消息数量。该参数已废弃。
"seq": "STRING", // 排序序号,需要配合源端打开 transactionEnabled 才能存在
"table": "STRING", // SQL 变更库表名 {database}.{table}
"rowid": "STRING", // {变更库表名}-{主键值使用\u0001} 分割
"trans": "STRING", // 事务 ID
"scn": "STRING" // 该字段只有在增量场景下 source.json 配置中包含 sequenceEnabled=true 才存在,默认是 true。用于排序,生成规则是一个同步进程中,时间戳 + 不超过五位序号递增。
},
"key": { // 仅 UPDATE 存在,表示变更前的值
}
}
数据示例如下:
INSERT
(插入)数据的示例{ "data": { "col1": "2020-11-25 00:01:02", "col2": "hello world", "col3": "INTERVAL '3' DAY", "col4": 1.2222, "col5": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col6": 129, "col7": "00:01:02", "col8": 1, "col9": "2020-11-25 00:01:02.012345 Asia/Shanghai", "col10": 2147483646, "col11": 9223372036854775806, "col12": "aGVsbG8gd29ybGQ=", "col13": "INTERVAL '4' YEAR", "col14": 3, "col15": "2020-11-25", "col16": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col17": 10223372036854775806, "col18": "1606233662.012345" }, "meta": { "posttime": "2020-12-07T13:22:00", "op": "ins", "size": 10, "time": "2020-11-25T00:01:02", "idx": "1/10", "seq": 1, "table": "mock_database.mock_table", "rowid":"mock_database.mock_table-3129", "trans": "shareplex_transaction_id", "scn": "123456789" } }
UPDATE
(更新)数据的示例{ "data": { "string": "hello world 2020" }, "meta": { "posttime": "2020-12-07T13:59:09", "op": "upd", "size": 10, "time": "2020-11-25T00:01:02", "idx": "1/10", "seq": 1, "table": "mock_database.mock_table", "rowid": "mock_database.mock_table-3\u0001129", "trans": "shareplex_transaction_id", "scn": "123456789" }, "key": { "col1": "2020-11-25 00:01:02", "col2": "hello world", "col3": "INTERVAL '3' DAY", "col4": 1.2222, "col5": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col6": 129, "col7": "00:01:02", "col8": 1, "col9": "2020-11-25 00:01:02.012345 Asia/Shanghai", "col10": 2147483646, "col11": 9223372036854775806, "col12": "aGVsbG8gd29ybGQ=", "col13": "INTERVAL '4' YEAR", "col14": 3, "col15": "2020-11-25", "col16": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col17": 10223372036854775806, "col18": "1606233662.012345" } }
DELETE
(删除)数据的示例{ "data": { "col1": "2020-11-25 00:01:02", "col2": "hello world", "col3": "INTERVAL '3' DAY", "col4": 1.2222, "col5": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col6": 129, "col7": "00:01:02", "col8": 1, "col9": "2020-11-25 00:01:02.012345 Asia/Shanghai", "col10": 2147483646, "col11": 9223372036854775806, "col12": "aGVsbG8gd29ybGQ=", "col13": "INTERVAL '4' YEAR", "col14": 3, "col15": "2020-11-25", "col16": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col17": 10223372036854775806, "col18": "1606233662.012345" }, "meta": { "posttime": "2020-12-07T13:34:10", "op": "del", "size": 10, "time": "2020-11-25T00:01:02", "idx": "1/10", "seq": 1, "table": "mock_database.mock_table", "rowid": "mock_database.mock_table-3\u0001129", "trans": "shareplex_transaction_id", "scn": "123456789" } }
DDL 示例
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';
{ "data": {}, "meta": { "posttime": "2022-12-16T15:54:51", "op": "ddl", "size": 0, "time": "2022-12-16T15:53:29", "idx": "0/0", "seq": 0, "table": "connector_test.all_mysql_type_test", "rowid": "connector_test.all_mysql_type_test-", "trans": null, "scn": "null" }, "sql": { "ddl": "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'" } }
DefaultExtendColumnType JSON 消息格式
数据同步至 Kafka、DataHub(BLOB 类型)和 RocketMQ 时,序列化方式 DefaultExtendColumnType 使用如下 JSON 消息格式。
DefaultExtendColumnType JSON 消息格式会在 DEFAULT
的基础上,在镜像内增加一个字段 __light_type
,用于表示字段的数据类型。
{
"prevStruct": { // 变更前镜像
},
"postStruct": { // 变更后镜像
"__light_type": {
"col": { // 字段的名称
"schemaType": "type" // 值的类型
}
}
},
"allMetaData": {
}
}
数据示例如下:
INSERT
(插入)数据的示例{ "allMetaData": { "checkpoint": null, "record_primary_key": "id1\u0001id2", "source_identity": null, "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table", "db": "database", "timestamp": "1609344671" }, "prevStruct": null, "recordType": "INSERT", "postStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", "__light_type": { "int8": { "schemaType": "TINYINT" }, "int16": { "schemaType": "SMALLINT" }, "int32": { "schemaType": "INT" }, "int64": { "schemaType": "INT64" }, "bigInt": { "schemaType": "BIGINT" }, "float32": { "schemaType": "FLOAT" }, "float64": { "schemaType": "DOUBLE" }, "string": { "schemaType": "VARCHAR" }, "bytes": { "schemaType": "BLOB" }, "decimal": { "schemaType": "DECIMAL" }, "localDate": { "schemaType": "DATE" }, "localTime": { "schemaType": "TIME" }, "localDateTime": { "schemaType": "DATETIME" }, "timestamp_in_long": { "schemaType": "TIMESTAMP" } } } }
UPDATE
(更新)数据的示例{ "allMetaData": { "checkpoint": null, "record_primary_key": "id1\u0001id2", "source_identity": null, "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table", "db": "database", "timestamp": "1609344671" }, "prevStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", "__light_type": { "int8": { "schemaType": "TINYINT" }, "int16": { "schemaType": "SMALLINT" }, "int32": { "schemaType": "INT" }, "int64": { "schemaType": "INT64" }, "bigInt": { "schemaType": "BIGINT" }, "float32": { "schemaType": "FLOAT" }, "float64": { "schemaType": "DOUBLE" }, "string": { "schemaType": "VARCHAR" }, "bytes": { "schemaType": "BLOB" }, "decimal": { "schemaType": "DECIMAL" }, "localDate": { "schemaType": "DATE" }, "localTime": { "schemaType": "TIME" }, "localDateTime": { "schemaType": "DATETIME" }, "timestamp_in_long": { "schemaType": "TIMESTAMP" } } }, "recordType": "UPDATE", "postStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col8": "hello world 2020", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", "__light_type": { "int8": { "schemaType": "TINYINT" }, "int16": { "schemaType": "SMALLINT" }, "int32": { "schemaType": "INT" }, "int64": { "schemaType": "INT64" }, "bigInt": { "schemaType": "BIGINT" }, "float32": { "schemaType": "FLOAT" }, "float64": { "schemaType": "DOUBLE" }, "string": { "schemaType": "VARCHAR" }, "bytes": { "schemaType": "BLOB" }, "decimal": { "schemaType": "DECIMAL" }, "localDate": { "schemaType": "DATE" }, "localTime": { "schemaType": "TIME" }, "localDateTime": { "schemaType": "DATETIME" }, "timestamp_in_long": { "schemaType": "TIMESTAMP" } } } }
DELETE
(删除)数据的示例{ "allMetaData": { "checkpoint": null, "record_primary_key": "id1\u0001id2", "source_identity": null, "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table", "db": "database", "timestamp": "1609344671" }, "prevStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", "__light_type": { "int8": { "schemaType": "TINYINT" }, "int16": { "schemaType": "SMALLINT" }, "int32": { "schemaType": "INT" }, "int64": { "schemaType": "INT64" }, "bigInt": { "schemaType": "BIGINT" }, "float32": { "schemaType": "FLOAT" }, "float64": { "schemaType": "DOUBLE" }, "string": { "schemaType": "VARCHAR" }, "bytes": { "schemaType": "BLOB" }, "decimal": { "schemaType": "DECIMAL" }, "localDate": { "schemaType": "DATE" }, "localTime": { "schemaType": "TIME" }, "localDateTime": { "schemaType": "DATETIME" }, "timestamp_in_long": { "schemaType": "TIMESTAMP" } } }, "recordType": "DELETE", "postStruct": null }
DDL 示例
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';
{ "prevStruct": null, "postStruct": { "ddl": "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'", "__light_type": { "ddl": { "schemaType": "VAR_STRING" } } }, "allMetaData": { "checkpoint": "1671177200", "dbType": "OB_MYSQL", "storeDataSequence": null, "db": "connector_test", "timestamp": "1671177209", "uniqueId": null, "ddlType": "ALTER_TABLE", "record_primary_key": null, "source_identity": null, "record_primary_value": null, "table_name": "all_mysql_type_test" }, "recordType": "DDL" }
Debezium JSON 消息格式
同步 OceanBase 数据库 MySQL 租户的数据至 Kafka、DataHub(BLOB 类型)和 RocketMQ 时,序列化方式 Debezium 使用如下 JSON 消息格式,共包含两种,通常默认仅显示 payload
中的结构。
存在
schema
和payload
{ "schema": { //描述 payload 字段信息的结构体,默认没有该结构体 "type": "struct", //struct 表示该字段内部还有结构 "optional": false, //是否必须包含该字段 "fields": [ { "type": "int64", //字段的类型 "optional": false, //是否必须包含该字段 "field": "ts_ms" //字段的名称 } ] }, "payload": { "op": "c", //数据修改类型,包括 c(全量、插入)、u(更新)、d(删除)和 HEARTBEAT(心跳消息) "source": { "version": "", //OMS 的版本 "connector": "OB_MYSQL", //数据源的类型 "name": "OMS", //固定值 OMS "ts_ms": 0, //数据变更秒级时间戳,仅增量存在 "db": "test", //使用 SQL 语句进行变更的数据库的名称。如果是 OceanBase 数据库,仅存在数据库名称,无需租户名称 "table": "testTab", //使用 SQL 语句进行变更的表的名称 "pos": "553132@1668496109" //在 binlog 文件中的位置 [binlog 文件名]@[binlog 文件名 offset] }, "before": { //变更前镜像 "column": "value" //键值对,包含全量键值 }, "after": { //变更后镜像 "column": "value" // 键值对,包含全量键值 }, "ts_ms": 1668497367188 //数据处理时间戳 } }
仅存在
payload
{ "payload": { "op": "c", //数据修改类型,包括 c(全量、插入)、u(更新)、d(删除)和 HEARTBEAT(心跳消息) "source": { "version": "", //OMS 的版本 "connector": "OB_MYSQL", //数据源的类型 "name": "OMS", //固定值 OMS "ts_ms": 0, //数据变更秒级时间戳,仅增量存在 "db": "test", //使用 SQL 语句进行变更的数据库的名称。如果是 OceanBase 数据库,仅存在数据库名称,无需租户名称 "table": "testTab", //使用 SQL 语句进行变更的表的名称 "pos": "553132@16684****" //在 binlog 文件中的位置 [binlog 文件名]@[binlog 文件名 offset] }, "before": { //变更前镜像 "column": "value" //键值对,包含全量键值 }, "after": { //变更后镜像 "column": "value" // 键值对,包含全量键值 }, "ts_ms": 1668497367188 //数据处理时间戳 } }
数据示例如下:
INSERT
(插入)数据的示例{ "schema": { "optional": false, "type": "STRUCT", "fields": [ { "field": "before", "optional": false, "type": "struct", "fields": [ { "field": "c01", "optional": false, "type": "int32" }, { "field": "c02", "optional": false, "type": "string" }, { "field": "c03", "optional": false, "type": "string" }, { "field": "c04", "optional": false, "type": "bytes" }, { "field": "c05", "optional": false, "type": "int16" }, { "field": "c06", "optional": false, "type": "int16" }, { "field": "c07", "optional": false, "type": "int32" }, { "field": "c08", "optional": false, "type": "int64" }, { "field": "c09", "optional": false, "type": "float64" }, { "field": "c10", "optional": false, "type": "float64" }, { "field": "c11", "optional": false, "type": "string" }, { "field": "c12", "optional": false, "type": "string" }, { "field": "c13", "optional": false, "type": "string" }, { "field": "c14", "optional": false, "type": "string" }, { "field": "c15", "optional": false, "type": "bytes" }, { "field": "c16", "optional": false, "type": "string" }, { "field": "c17", "optional": false, "type": "bytes" }, { "field": "c18", "optional": false, "type": "bytes" }, { "field": "c19", "optional": false, "type": "bytes" }, { "field": "c20", "optional": false, "type": "bytes" }, { "field": "c21", "optional": false, "type": "string" }, { "field": "c22", "optional": false, "type": "int32" }, { "field": "c23", "optional": false, "type": "int64" }, { "field": "c24", "optional": false, "type": "string" }, { "field": "c25", "optional": false, "type": "int32" }, { "field": "c26", "optional": false, "type": "bytes" } ] }, { "field": "after", "optional": false, "type": "struct", "fields": [ { "field": "c01", "optional": false, "type": "int32" }, { "field": "c02", "optional": false, "type": "string" }, { "field": "c03", "optional": false, "type": "string" }, { "field": "c04", "optional": false, "type": "bytes" }, { "field": "c05", "optional": false, "type": "int16" }, { "field": "c06", "optional": false, "type": "int16" }, { "field": "c07", "optional": false, "type": "int32" }, { "field": "c08", "optional": false, "type": "int64" }, { "field": "c09", "optional": false, "type": "float64" }, { "field": "c10", "optional": false, "type": "float64" }, { "field": "c11", "optional": false, "type": "string" }, { "field": "c12", "optional": false, "type": "string" }, { "field": "c13", "optional": false, "type": "string" }, { "field": "c14", "optional": false, "type": "string" }, { "field": "c15", "optional": false, "type": "bytes" }, { "field": "c16", "optional": false, "type": "string" }, { "field": "c17", "optional": false, "type": "bytes" }, { "field": "c18", "optional": false, "type": "bytes" }, { "field": "c19", "optional": false, "type": "bytes" }, { "field": "c20", "optional": false, "type": "bytes" }, { "field": "c21", "optional": false, "type": "string" }, { "field": "c22", "optional": false, "type": "int32" }, { "field": "c23", "optional": false, "type": "int64" }, { "field": "c24", "optional": false, "type": "string" }, { "field": "c25", "optional": false, "type": "int32" }, { "field": "c26", "optional": false, "type": "bytes" } ] }, { "field": "source", "optional": false, "type": "struct", "fields": [ { "field": "version", "optional": false, "type": "string" }, { "field": "connector", "optional": false, "type": "string" }, { "field": "name", "optional": false, "type": "string" }, { "field": "ts_ms", "optional": false, "type": "int64" }, { "field": "db", "optional": false, "type": "string" }, { "field": "table", "optional": false, "type": "string" }, { "field": "server_id", "optional": false, "type": "int64" }, { "field": "pos", "optional": false, "type": "string" } ] }, { "field": "op", "optional": false, "type": "string" }, { "field": "ts_ms", "optional": false, "type": "int64" } ] }, "payload": { "op": "c", "source": { "connector": "OB_MYSQL", "pos": "703223@166849****", "name": "OMS", "version": "", "ts_ms": 1668491621000, "db": "test", "table": "table_name" }, "after": { "c11": "a", "c10": 2.4212412, "c13": "c", "c12": "b", "c15": "65", "c14": "d", "c17": "67", "c16": "f", "c19": "690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18": "68", "c20": "6A", "c22": 19311, "c21": "2022-11-15T05:12:11Z", "c02": "12312", "c24": 1668489131000, "c01": 2, "c23": 36060000000, "c04": "61", "c26": "6B", "c03": "1241.41000", "c25": 2022, "c06": 141, "c05": 11, "c08": 412124124, "c07": 4241, "c09": 2.11111 }, "ts_ms": 1668495423594 } }
UPDATE
(更新)数据的示例{ "schema":{ "optional":false, "type":"STRUCT", "fields":[ { "field":"before", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"after", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"source", "optional":false, "type":"struct", "fields":[ { "field":"version", "optional":false, "type":"string" }, { "field":"connector", "optional":false, "type":"string" }, { "field":"name", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" }, { "field":"db", "optional":false, "type":"string" }, { "field":"table", "optional":false, "type":"string" }, { "field":"server_id", "optional":false, "type":"int64" }, { "field":"pos", "optional":false, "type":"string" } ] }, { "field":"op", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" } ] }, "payload":{ "op":"u", "before":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"436999@166849****", "name":"OMS", "version":"", "ts_ms":1668495861000, "db":"test", "table":"table_name" }, "after":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495906356 } }
DELETE
(删除)数据的示例{ "schema":{ "optional":false, "type":"STRUCT", "fields":[ { "field":"before", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"after", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"source", "optional":false, "type":"struct", "fields":[ { "field":"version", "optional":false, "type":"string" }, { "field":"connector", "optional":false, "type":"string" }, { "field":"name", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" }, { "field":"db", "optional":false, "type":"string" }, { "field":"table", "optional":false, "type":"string" }, { "field":"server_id", "optional":false, "type":"int64" }, { "field":"pos", "optional":false, "type":"string" } ] }, { "field":"op", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" } ] }, "payload":{ "op":"d", "before":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"553132@1668****", "name":"OMS", "version":"", "ts_ms":1668496109000, "db":"test", "table":"table_name" }, "ts_ms":1668496119717 } }
DebeziumFlatten JSON 消息格式
同步 OceanBase 数据库 MySQL 租户的数据至 Kafka、DataHub(BLOB 类型)和 RocketMQ 时,序列化方式 DebeziumFlatten 的 JSON 消息格式如下所示,和序列化方式 Debezium 相比,不再填充 schema
和 payload
。
{
"op": "c", //数据修改类型,包括 c(全量、插入)、u(更新)、d(删除)和 HEARTBEAT(心跳消息)
"source": {
"version": "", //OMS 的版本
"connector": "OB_MYSQL", //数据源的类型
"name": "OMS", //固定值 OMS
"ts_ms": 0, //数据变更秒级时间戳,仅增量存在
"db": "test", //使用 SQL 语句进行变更的数据库的名称。如果是 OceanBase 数据库,仅存在数据库名称,无需租户名称
"table": "testTab", //使用 SQL 语句进行变更的表的名称
"pos": "553132@16684****" //在 binlog 文件中的位置 [binlog 文件名]@[binlog 文件名 offset]
},
"before": { //变更前镜像
"column": "value" //键值对,包含全量键值
},
"after": { //变更后镜像
"column": "value" // 键值对,包含全量键值
},
"ts_ms": 1668497367188 //数据处理时间戳
}
数据示例如下:
INSERT
(插入)数据的示例{ "op":"c", "source":{ "connector":"OB_MYSQL", "pos":"703223@166849****", "name":"OMS", "version":"", "ts_ms":1668491621000, "db":"test", "table":"table_name" }, "after":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":2, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495423594 }
UPDATE
(更新)数据的示例{ "op":"u", "before":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"436999@166849****", "name":"OMS", "version":"", "ts_ms":1668495861000, "db":"test", "table":"table_name" }, "after":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495906356 }
DELETE
(删除)数据的示例{ "op":"d", "before":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"553132@1668****", "name":"OMS", "version":"", "ts_ms":1668496109000, "db":"test", "table":"table_name" }, "ts_ms":1668496119717 }
DebeziumSmt JSON 消息格式
DebeziumSmt 是 Debezium 提供的一种配置方式,使用事件扁平化单消息转换(Single Message Transform,SMT)对单条信息进行转换和处理。同步 OceanBase 数据库 MySQL 租户的数据至 Kafka、DataHub(BLOB 类型)和 RocketMQ 时,序列化方式 DebeziumSmt 的 JSON 消息格式仅显示 after
中的 key:value
。
例如,使用序列化方式 Debezium 更新数据:
{
"op": "u",
"source": {
"connector": "OB_MYSQL",
"name": "OMS"
},
"ts_ms": 1668496119717,
"before": {
"field1": "before_value1",
"field2": "before_value2"
},
"after": {
"field1": "after_value1",
"field2": "after_value2"
}
}
SMT 对上述示例的消息进行处理后,简化了消息格式。即使用序列化方式 DebeziumSmt,JSON 消息格式如下所示。
{
"field1": "after_value1",
"field2": "after_value2"
}
数据示例如下:
INSERT
(插入)数据的示例{ "field1": "after_value1", "field2": "after_value2", "__deleted": "false" }
UPDATE
(更新)数据的示例{ "field1": "after_value1", "field2": "after_value2", "__deleted": "false" }
DELETE
(删除)数据的示例{ "field1": "after_value1", "field2": "after_value2", "__deleted": "true" }
Avro JSON 消息格式
同步 OceanBase 数据库 MySQL 租户的数据至 Kafka 时,序列化方式 Avro 使用如下 JSON 消息格式。
全量迁移
{ "version": 1, "id": 0, "sourceTimestamp": 1702371565, // 时间戳安全位点。 "sourcePosition": "", // 全量迁移无 position 等信息。 "safeSourcePosition": "", "sourceTxid": "", "source": { "sourceType": "MySQL", // 固定值 MySQL。 "version": "OBMySQL" // 固定值 OBMySQL。 }, "operation": "INIT", // 全量类型为 INIT。 "objectName": "test***", "processTimestamps": [ 1702371565238 ], // 只有投递时间。 "tags": { "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" // 只有主键类型。 }, "fields": [ { "name": "id", "dataTypeNumber": 246 }, // 每个列的类型。 { "name": "bid", "dataTypeNumber": 3 }, { "name": "name", "dataTypeNumber": 15 }, { "name": "address", "dataTypeNumber": 254 } ], "beforeImages": null, // 全量迁移前镜像为空。 "afterImages": [ // 后镜像。INTEGER 类型的 precision 衡为 8,FLOAT 类型的 precision 衡为 8、scale 衡为 64。 { "value": "1", "precision": 1, "scale": 0 }, { "precision": 8, "value": "11" }, { "charset": "utf8mb4", "value": { "bytes": "yyy" } }, null ] }
增量同步 DML
INSERT
(插入)数据的示例{ "version": 1, "id": 170236922143600000, "sourceTimestamp": 1702369092, "sourcePosition": "1702369080", // OceanBase 数据库 MySQL 租户的 checkpoint。 "safeSourcePosition": "1702369080", // OceanBase 数据库 MySQL 租户的 checkpoint。 "sourceTxid": "", "source": { "sourceType": "MySQL", "version": "OBMySQL" }, "operation": "INSERT", "objectName": "test***", "processTimestamps": [1702369221480], "tags": { "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" }, "fields": [ {"name": "id", "dataTypeNumber": 8}, {"name": "bid", "dataTypeNumber": 3}, {"name": "name", "dataTypeNumber": 15} ], "beforeImages": null, // INSERT 前镜像为空。 "afterImages": [ {"precision": 8, "value": "2"}, {"precision": 8, "value": "12"}, {"charset": "utf8mb4", "value": {"bytes": "xxx"} } ] }
UPDATE
(更新)数据的示例{ "version": 1, "id": 170236975822100001, "sourceTimestamp": 1702369757, "sourcePosition": "1702369756", "safeSourcePosition": "1702369756", "sourceTxid": "", "source": { "sourceType": "MySQL", "version": "OBMySQL" }, "operation": "UPDATE", "objectName": "test***", "processTimestamps": [1702369758237], "tags": { "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" }, "fields": [ {"name": "id", "dataTypeNumber": 8}, {"name": "bid", "dataTypeNumber": 3}, {"name": "name", "dataTypeNumber": 15} ], "beforeImages": [ // UPDATE 存在前镜像和后镜像。 {"precision": 8, "value": "3"}, {"precision": 8, "value": "22"}, {"charset": "utf8mb4", "value": {"bytes": "xxx"}} ], "afterImages": [ {"precision": 8, "value": "3"}, {"precision": 8, "value": "44"}, {"charset": "utf8mb4", "value": {"bytes": "xxx"}} ] }
DELETE
(删除)数据的示例{ "version": 1, "id": 170236976527500000, "sourceTimestamp": 1702369764, "sourcePosition": "1702369763", "safeSourcePosition": "1702369763", "sourceTxid": "", "source": { "sourceType": "MySQL", "version": "OBMySQL" }, "operation": "DELETE", "objectName": "test***", "processTimestamps": [1702369765287], "tags": { "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" }, "fields": [ {"name": "id", "dataTypeNumber": 8}, {"name": "bid", "dataTypeNumber": 3}, {"name": "name", "dataTypeNumber": 15} ], "beforeImages": [ {"precision": 8, "value": "3"}, {"precision": 8, "value": "44"}, {"charset": "utf8mb4", "value": {"bytes": "xxx"}} ], "afterImages": null // DELETE 后镜像为空。 }
增量同步 DDL
{ "version": 1, "id": 170236979372400000, "sourceTimestamp": 1702369793, "sourcePosition": "1702369792", "safeSourcePosition": "1702369792", "sourceTxid": "", "source": { "sourceType": "MySQL", "version": "OBMySQL" }, "operation": "DDL", "objectName": "test***", "processTimestamps": [ 1702369794543 ], "tags": {}, "fields": null, // 增量同步 DDL 无 fields 和 beforeImages。 "beforeImages": null, "afterImages": "alter table multi_db_multi_tbl add column address char(20) default null" // STRING 类型的 afterImages 为 DDL 语句。 }
数据库传输到文本协议的格式说明
同步 OceanBase 数据库的数据至 Kafka、DataHub(BLOB 类型)和 RocketMQ 时:
如果序列化方式为 Default、Canal、DataWorks(支持 2.0 版本)、SharePlex 或 DefaultExtendColumnType,OceanBase 数据库两种租户对应的映射说明如下。
OceanBase 数据库 MySQL 租户
数据类型
映射类型
描述
TINYINT
SMALLINT
MEDIUMINT
INT
INTEGER
YEAR
BOOL
BOOLEAN
Long
64 位以下的整型。
正常数值,例如 1000,不使用科学计数法。
对于 BOOL/BOOLEAN,则 true = 1,false = 0。
DECIMAL
NUMERIC
BigDecimal
精确小数数值类型以及超过 64 位的整型。
对于整型数值不会展示小数点及小数。
对于存在小数的数值,会根据数据库传入的数据进行位数展示,不会去除末尾的 0,使用科学计数法。
FLOAT
DOUBLE
Double
浮点数
根据源端是 FLOAT 或 DOUBLE 类型决定有效位数。FLOAT 是 7 位有效位数,DOUBLE 是 16 位有效位数。
CHAR
VARCHAR
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
ENUM
SET
String
字符串。
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
BINARY
VARBINARY
BIT
Bytes
字节数组,默认以 BASE64 编码展示。
说明对于 BIT 定长的类型,增量接收到字节数组之后会将高位 0 去除,但是全量不会,所以看到的 BASE64 编码可能会不一致。但是实际结果一致,解码之后结果一致。
DATE
Date
日期类型,格式为
YYYY-MM-DD
。 如果是非法时间,会显示原有字符串。TIME
Time
时间类型,格式为
HH:mm:ss[.nnnnnnnnn]
。低于秒级的时间最多展示 9 位。如果是低于秒级的时间,会显示出所有非 0 的数值。如果是非法时间,会显示原有字符串。
DATETIME
DateTime
日期时间类型,包括时区。格式为
YYYY-MM-DD HH:mm:ss[.nnnnnnnnn] [zoneId]
。低于秒级的时间最多展示 9 位。如果是低于秒级的时间,会显示出所有非 0 的数值。如果是非法时间,会显示原有字符串。
TIMESTAMP
Timestamp
时间戳类型,格式为
[秒级时间戳][.nnnnnnnnn]
。低于秒级的时间最多展示 9 位。如果是低于秒级的时间,会显示出所有非 0 的数值。如果是非法时间,会以 0000-00-00 00:00:00 格式显示。
OceanBase 数据库 Oracle 租户
数据类型
映射类型
描述
INTEGER
Long
64 位以下的整型。
正常数值,例如 1000,不使用科学计数法。
NUMBER
FLOAT
BigDecimal
精确小数数值类型以及超过 64 位的整型。
BINARY_FLOAT BINARY_DOUBLE
Double
浮点数
根据源端是 FLOAT 或 DOUBLE 类型决定有效位数。FLOAT 是 7 位有效位数,DOUBLE 是 16 位有效位数。
VARCHAR2
NVARCHAR2
INTERVAL YEAR TO MOTH
INTERVAL DAY TO SECOND
CLOB
NCLOB
ROWID
UROWID
String
字符串
BLOB
BFILE
RAW
Bytes
字节数组
默认以 BASE64 编码展示。
DATE
TIMESTAMP
TIMESTAMP WITH TIME ZONE
TIMESTAMP WITH LOCAL TIME ZONE
DateTime
日期时间类型,包括时区。格式为
YYYY-MM-DD HH:mm:ss[.nnnnnnnnn] [zoneId]
。低于秒级的时间最多展示 9 位。如果是低于秒级的时间,会显示出所有非 0 的数值。如果是非法时间,会显示原有字符串。
如果序列化方式为 Debezium,OceanBase 数据库 MySQL 租户对应的映射说明如下。
重要同步 OceanBase 数据库 Oracle 租户的数据至 Kafka、DataHub(BLOB 类型)和 RocketMQ 时,不支持选择序列化方式 Debezium。
数据类型
映射类型
描述
BOOLEAN
BOOL
BOOLEAN
取值包括 true 和 false。
TINYINT
SMALLINT
MEDIUMINT
INT/INTEGER
BIGINT
YEAR
LONG
-263 ~ 263范围的整型。
BIGINT
STRING
使用字符串完整展示数据。
FLOAT
DOUBLE
DOUBLE
浮点数。
DECIMAL
NUMERIC
STRING
使用字符串完整展示数据。对于存在小数的数值,会根据数据库传入的数据进行位数展示,不会去除末尾的 0,使用科学计数法。
BIT
BINARY
VARBINARY
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
BYTES
字节数组,base16 编码。
CHAR
VARCHAR
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
ENUM
SET
STRING
字符串。
TIMESTAMP
STRING
格式为 YYYY-MM-DDTHH:mm:ss[.nnnnnnnnn]Z,时区为 0 时区。
DATE
LONG
表示自 1970-01-01 以来的天数。
TIME
LONG
表示自 00:00:00 以来的时间值(以微秒为单位),不包括时区信息。
DATETIME
LONG
表示自 1970-01-01 00:00:00 以来的毫秒数,不包括时区信息。
如果序列化方式为 Avro,OceanBase 数据库 MySQL 租户对应的映射说明如下。
重要仅同步 OceanBase 数据库 MySQL 租户的数据至 Kafka 时,支持选择序列化方式 Avro。
类型名称
映射类型
TINYINT
BOOLEAN
SMALLINT
MEDIUMINT
INT
BIGINT
BIT
INTEGER
FLOAT
DOUBLE
FLOAT
DECIMAL
NUMERIC
DECIMAL
VARCHAR
CHAR
TINYTEXT
MEDIUMTEXT
LONGTEXT
TEXT
CHARACTER
BINARY
VARBINARY
TINYBLOB
MEDIUMBLOB
LONGBLOB
BLOB
BinaryObject
TIMESTAMP
TimestampObject
说明对于 TIMESTAMP 类型,全量和增量均会转换至时间戳,非法时间为
-9223372022400L
。除非法时间外,您可以使用 Java 的
Instant.ofEpochSecond(ts, nanos)
方法获取正确的时间。DATE
TIME
DATETIME
YEAR
DATETIME
JSON
ENUM
SET
TextObject
GEOMETRY
TextGeometry
说明目前数据传输使用 EWKT 格式透传,所以映射为 TextGeometry 类型。