# Elasticsearch 到 MySQL 的数据同步示例
本示例主要介绍 Elasticsearch 到 MySQL 的数据同步示例场景开发,该场景根据 Elasticsearch 接收到的数据内容向 MySQL 数据源的表 userinfo 插入、更新数据。主要步骤如下:
# 准备数据
在 MySQL数据源中创建一个表 userinfo,用于写入ES数据
CREATE TABLE userinfo (
usercode varchar(64) NOT NULL,
username varchar(64) DEFAULT NULL,
sex varchar(64) DEFAULT NULL,
age int(11) DEFAULT NULL,
unitcode varchar(64) DEFAULT NULL,
deptcode varchar(64) DEFAULT NULL,
telephone varchar(64) DEFAULT NULL,
createtime datetime DEFAULT NULL,
updatetime datetime DEFAULT NULL,
PRIMARY KEY (usercode)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO `dws_front_dev`.`userinfo` (`usercode`, `username`, `sex`, `age`, `unitcode`, `deptcode`, `telephone`, `createtime`, `updatetime`) VALUES ('001', '张聪', '男', 20, 'test', 'test', '15091345789', '2018-12-13 00:00:00', '2018-12-13 00:00:00');
在 Elasticsearch数据源中插入数据。具体如下:
curl -H "Content-Type: application/json" \
-X POST 'https://elastic:primeton000000@192.168.16.211:9200/user/_bulk' -d '
{ "index": { "_index":"user","_id": "1" }}
{ "usercode": "001", "username": "张聪", "sex": "男", "age": 35, "unitcode": "普元信息", "deptcode": "技术部", "telephone": "15091345789", "createtime": "2018-12-13 00:00:00" , "updatetime": "2023-12-18 12:00:10"}
{ "index": { "_index":"user","_id": "2" }}
{ "usercode": "002", "username": "王强", "sex": "男", "age": 25, "unitcode": "普元信息", "deptcode": "售后", "telephone": "15309872351", "createtime": "2018-12-13 23:59:59" , "updatetime": "2024-01-18 12:30:00"}
' -k
https://elastic:primeton000000@192.168.16.211:9200/user/_bulk
修改为自己使用的es数据源信息,user索引_index 。
# 新建实时作业
点击资源树节点上的【...】,选择弹出菜单【新建实时作业】,填写作业名称,点击【确定】按钮。
# 画布中拖拽图元
在画布中分别拖拽 1 个"Elasticsearch数据源"图元、1 个"关系型数据库目标"图元,并建立连线。
# 配置"Elasticsearch数据源"组件属性
在"Elasticsearch数据源"图元上右键,点击【编辑】按钮,弹出"Elasticsearch数据源"组件的弹窗。按照需求进行属性设置,点击【确定】按钮。
# 配置"关系型数据库目标"组件属性
在"关系型数据库目标"图元上右键,点击【编辑】按钮,弹出"关系型数据库目标"组件的弹窗。设置必填的属性,启用【是否启用upsert语法支持】选项并设置主键字段usercode
,其他属性默认。
# 通用配置
在通用配置中可以配置任务优先级、Worker 分组、本地参数、超时告警、部署方式、引擎参数。 修改属性后请务必点击【确定】按钮。
# 保存草稿
如果所有组件属性都已设置完毕,点击【保存】按钮,可以看到保存过的历史草稿,并可以随意切换草稿。(草稿只保存最近 10 个)
可以参考示例关系型表数据同步示例 中的"保存草稿"说明。
# 运行
点击【运行】按钮,可以运行已经开发完毕的场景,在日志栏可以看运行日志及运行结果。
可以在 MySQL数据库表 userinfo 中看到usercode为001的值被更新,新插入数据usercode为002。
# 提交版本
当草稿运行正常后,点击【提交】按钮可以将该版本提交到作业调度,每次修改提交都会生成新的版本,可以看到提交的历史版本,并可以随意切换版本。
提交后的版本,可以在作业调度中进行"定时"调度配置。
可以参考示例关系型表数据同步示例 中的"提交版本"说明。