当前位置: 首页 > news >正文

定制网站建设加盟代理湖南网络推广服务

定制网站建设加盟代理,湖南网络推广服务,常规网站建设内容,黑龙江建设厅官网1.引言 业务程序经常会通过各式各样的缓存来提升用户的访问速度。 由于存在缓存,在一些实时性要求较高的场景中,需要在数据变更的同时将数据缓存进行更新或删除。 如果数据本身由其他业务部门提供,就无法在写入的同时做缓存的一致性处理。…

1.引言

业务程序经常会通过各式各样的缓存来提升用户的访问速度。

由于存在缓存,在一些实时性要求较高的场景中,需要在数据变更的同时将数据缓存进行更新或删除。

如果数据本身由其他业务部门提供,就无法在写入的同时做缓存的一致性处理。

此时,可以通过其他业务部门暴露数据变更通知来感知到数据变化,从而保证数据的更新及时性。

TiCDC 是一款 TiDB 增量数据同步工具,通过拉取上游 TiKV 的数据变更日志,TiCDC 可以将数据解析为有序的行级变更数据输出到下游。

因此,可以通过 TiCDC 将数据变更通知暴露给业务程序,来让业务程序做及时的对应处理逻辑。

本文以一张用户表的数据变更为例,来展示 Java 服务端接收一条 TiCDC Canal-JSON 的消息变更,解析数据,并转发给对应的业务处理程序的流程。

之前写类似的程序时,网上搜索到的案例还是比较少的,本文仅抛砖引玉,欢迎各位大佬批评指正!

2. 代码思路

(1)通过 kafka 消息获取 CDC 消息

(2)解析 CDC 消息,判断其数据变更类型,执行对应的处理逻辑

3. 代码实现

3.1 代码结构

cdc-demo
└─ src└─ main└─ java└─ com.example.demo├─ constants│    └─ CdcConstants.java├─ dto│    ├─ CdcMessage.java│    └─ User.java├─ job│    ├─ CdcJob.java│    └─ UserCdcJob.java└─ service├─ impl│   └─ UserServiceImpl.java└─ CdcService.java

3.2 CDC 常量类

public class CdcConstants {
​public enum MessageType {/*** 插入操作*/INSERT,
​/*** 更新操作*/UPDATE,
​/*** 删除操作*/DELETE;}
}

3.3 实体类

3.3.1 用户实体类

@Getter
@Setter
public class User {
​/*** 用户id*/private Long id;
​/*** 用户名*/private String name;
​/*** 年龄*/private Integer age;
}

3.3.2 CDC 消息实体类

@Getter
@Setter
public class CdcMessage<T> {
​/*** 数据集合*/private List<T> data;
​/*** 数据库名称*/private String database;
​/*** 是否为DDL语句isDdl*/private boolean isDdl;
​/*** 表结构的类型字段(值为字段类型,如varchar)*/private T mysqlType;
​/*** UPDATE类型下的旧数据(未变更字段无数据)*/private List<T> oldData;
​/*** sql语句*/private String sql;
​/*** 值为int类型*/private T sqlType;
​/*** 数据表名*/private String table;
​/*** 新增(INSERT)、更新(UPDATE)、删除(DELETE)、删除表(ERASE)等*/private String type;
}

3.4 任务类

3.4.1 CDC 任务基类

@Slf4j
public class CdcJob<T> {
​protected CdcService<T> cdcService;
​/*** 处理消息** @param record 消息记录* @param ack    消息处理标识*/public void handleMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {String recordString = String.format("topic:%s,partition:%s,offset:%s,value:%s",record.topic(),record.partition(),record.offset(),record.value());log.info("数据更新开始处理,消息:" + recordString);try {boolean processResult = process(record);String processResultString = processResult ? "成功" : "无更新";log.info("数据更新处理结束,处理结果:" + processResultString);} catch (Exception e) {log.error("数据更新报错,", e);} finally {// 手动提交偏移量ack.acknowledge();}}
​/*** 处理数据** @param record kafka消费记录* @return 处理结果*/public boolean process(ConsumerRecord<String, String> record) {String bizName = this.getClass().getSimpleName();// 服务为初始化报错if (null == cdcService) {throw new IllegalStateException("服务未初始化");}
​// 解析消息CdcMessage<T> cdcMessage = JSON.parseObject(record.value(), new TypeReference<CdcMessage<T>>() {});
​// 跳过DDLif (cdcMessage.isDdl()) {log.info(bizName, "DDL变更,无需处理");return false;}// 处理结果初始化boolean result = false;// 服务层处理数据List<T> dataList = cdcMessage.getData();if (CdcConstants.MessageType.INSERT.name().equals(cdcMessage.getType())) {result = cdcService.insert(dataList);} else if (CdcConstants.MessageType.UPDATE.name().equals(cdcMessage.getType())) {result = cdcService.update(cdcMessage.getOldData(), dataList);} else if (CdcConstants.MessageType.DELETE.name().equals(cdcMessage.getType())) {result = cdcService.delete(dataList);} else {log.warn(bizName, "不处理该消息,消息类型:" + cdcMessage.getType());}return result;}
}

3.4.2 用户表 CDC 消费任务类

@Component
public class UserCdcJob extends CdcJob<User> {
​public UserCdcJob(UserServiceImpl userService) {this.cdcService = userService;}
​/*** 消费CDC消息,并进行处理** @param record 消息记录* @param ack    消息处理标识*/@KafkaListener(id = "UserCdcJob", groupId = "${user-cdc.group}",topics = {"${user-cdc.topic}"}, containerFactory = "cdcKafkaListenerFactory")public void consumer(ConsumerRecord<String, String> record, Acknowledgment ack) {handleMessage(record, ack);}
}

3.5 服务类

3.5.1 CDC 消息处理服务接口

public interface CdcService<T> {
​/*** 插入数据** @param data 数据* @return 插入结果*/boolean insert(List<T> data);
​/*** 更新数据** @param oldData 更新前数据* @param newData 更新后数据* @return 更新结果*/boolean update(List<T> oldData, List<T> newData);
​/*** 删除数据** @param data 数据* @return 删除数据*/boolean delete(List<T> data);
}

3.5.2 用户服务实现类

@Service
public class UserServiceImpl implements CdcService<User> {
​@Overridepublic boolean insert(List<User> data) {// TODOreturn false;}
​@Overridepublic boolean update(List<User> oldData, List<User> newData) {// TODOreturn false;}
​@Overridepublic boolean delete(List<User> data) {// TODOreturn false;}
}

4.参考文档

TiCDC 简介:https://docs.pingcap.com/zh/tidb/stable/ticdc-overview

TiCDC Canal-JSON 协议:https://docs.pingcap.com/zh/tidb/stable/ticdc-canal-json

http://www.hengruixuexiao.com/news/47215.html

相关文章:

  • 网络设计报告书安卓优化大师历史版本
  • 科技网站设计公司新闻软文广告
  • 山东省住房城乡建设厅门户网站网店运营怎么学
  • 网站建设与维护成绩查询百度商家版下载
  • 深圳网站建设 独搜索引擎关键词优化方案
  • 天津科技公司网站网站如何宣传推广
  • wordpress访客统计插件seo网站推广seo
  • 合肥网页设计兼职绍兴seo排名外包
  • dw做网站有雪花效果百度权重是什么
  • A级网站建设服务百度推广怎么看关键词排名
  • 如何上传视频到公司网站推广页面制作
  • 黑龙江做网站的公司广告投放方案
  • 网站做成app需要多少钱网易疫情实时最新数据
  • 沂水做网站教育培训机构招生方案
  • 德国 网站后缀制作网站的软件有哪些
  • 济宁专业网站建设网站优化seo方案
  • 汽车网站代码广告设计公司
  • 做网站是否过时了网络营销理论包括哪些
  • 大连网站推广价格三只松鼠口碑营销案例
  • 单一产品做网站排名优化公司哪家好
  • 怎么做展示网站app营销策略有哪些
  • 做网站的无锡seo关键词排名技术
  • 杭州哪里找网站建设的兼职建网站的步骤
  • 网页设计师个人网站永久8x的最新域名
  • 外贸网站建设公司流程重庆高端seo
  • 网站正在备案中鸡西seo
  • 德化住房和城乡建设网站长沙正规seo优化价格
  • 男女激烈做羞羞事网站福州网站关键词推广
  • 网络工程师 网站建设推广网站多少钱
  • 手机公众平台网站开发网络营销推广策略有哪些