# 将数据库中数据写入Elasticsearch中
本示例主要演示从Mysql数据库中读取数据,写入Elasticsearch中。
主要步骤如下:
# 准备数据
1、在Mysql数据库中创建表
DROP TABLE IF EXISTS `orderinfo`;
CREATE TABLE `orderinfo` (
`ordercode` varchar(32) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '订单号',
`amount` int NULL DEFAULT NULL COMMENT '数量',
`price` varchar(32) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '价格',
`manufacturer` varchar(32) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '厂商',
`createtime` date NULL DEFAULT NULL COMMENT '创建时间',
`createuser` varchar(32) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '创建人',
`updatetime` timestamp NULL DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`ordercode`)
);
-- ----------------------------
-- Records of orderinfo
-- ----------------------------
INSERT INTO `orderinfo` VALUES ('001', 1000, '5000', 'xingyun', '2018-12-12', 'zhangcong', '2018-12-13 00:00:00');
INSERT INTO `orderinfo` VALUES ('002', 500, '1000', '西安金属', '2018-12-12', '王强', '2018-12-15 00:00:00');
INSERT INTO `orderinfo` VALUES ('003', 1500, '7000', '枫禾林木', '2018-12-12', '魏雨', '2018-12-13 12:00:00');
INSERT INTO `orderinfo` VALUES ('004', 20000, '75000', '星峰', '2018-12-12', '王林', '2018-12-13 00:00:00');
INSERT INTO `orderinfo` VALUES ('005', 1300, '8000', '天威科技', '2018-12-12', '王林', '2019-12-01 00:00:00');
INSERT INTO `orderinfo` VALUES ('006', 2000, '5000', '微云科', '2018-12-12', '王林2', '2019-12-01 00:20:11');
INSERT INTO `orderinfo` VALUES ('007', 3000, '9000', '西安金属', '2018-12-12', '王强', '2023-11-11 11:11:11');
INSERT INTO `orderinfo` VALUES ('008', 2300, '11589', '星峰', '2018-12-12', '王强', '2020-01-01 00:00:00');
INSERT INTO `orderinfo` VALUES ('009', 5100, '12909', '0星云制造', '2018-12-12', '魏雨', '2018-12-13 00:00:00');
INSERT INTO `orderinfo` VALUES ('011', 1800, '5500', '00微云科技', '2022-12-31', '魏雨', '2023-01-04 10:11:22');
INSERT INTO `orderinfo` VALUES ('013', 1302, '5302', '天威科技', '2024-05-24', '魏雨', '2024-05-23 16:59:00');
INSERT INTO `orderinfo` VALUES ('014', 1300, '5400', '天威科技', '2024-05-22', '魏雨', '2024-05-23 17:09:00');
INSERT INTO `orderinfo` VALUES ('020', 200, '200', '天威科技', '2024-05-22', '魏雨1', '2024-05-23 17:09:00');
INSERT INTO `orderinfo` VALUES ('1111', 1111, '1111', '1111', '2024-08-22', '1111', '2024-08-22 15:30:01');
# 新建同步作业
点击数据同步上的【...】,选择弹出菜单【新建数据同步作业】,作业名称为:elasticSink。
# 拖拽图元
依次拖拽数据源中的JDBC Source组件和目标中的Elasticsearch Sink组件,依次连线。如下图所示:
# 配置组件属性
1、双击"JDBC Source"组件,根据下图所示步骤依次配置。
SQL语句:
select * from dws_source_dev.orderinfo
2、双击"Elasticsearch Sink"组件,根据下图所示步骤依次配置。
3、Ctrl+S保存该模型。
# 运行
点击【运行】按钮,可以运行已经开发完毕的场景,在日志栏可以看运行日志及运行结果。
# 查看数据
使用kibana查询orderinfo2索引中的数据。
# 提交版本
当草稿运行正常后,点击【提交】按钮可以将该版本提交到作业调度,每次修改提交都会生成新的版本,可以看到提交的历史版本,并可以随意切换版本。
提交后的版本,可以在作业调度中进行"定时"调度配置。