当前位置: 首页 > news >正文

青岛外贸网站推广百度官方网站首页

青岛外贸网站推广,百度官方网站首页,检测网站速度,溧阳做网站价格springboot 连接 kafka集群 一、环境搭建1.1 springboot 环境1.2 kafka 依赖 二、 kafka 配置类2.1 发布者2.1.1 配置2.1.2 构建发布者类2.1.3 发布消息 2.2 消费者2.2.1 配置2.2.2 构建消费者类2.2.3 进行消息消费 一、环境搭建 1.1 springboot 环境 JDK 11 Maven 3.8.x spr…

springboot 连接 kafka集群

  • 一、环境搭建
    • 1.1 springboot 环境
    • 1.2 kafka 依赖
  • 二、 kafka 配置类
    • 2.1 发布者
      • 2.1.1 配置
        • 2.1.2 构建发布者类
        • 2.1.3 发布消息
    • 2.2 消费者
      • 2.2.1 配置
      • 2.2.2 构建消费者类
      • 2.2.3 进行消息消费

一、环境搭建

1.1 springboot 环境

JDK 11+
Maven 3.8.x+
springboot 2.5.4 +

1.2 kafka 依赖

springboot的pom文件导入

       <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.4.0</version></dependency>

二、 kafka 配置类

2.1 发布者

2.1.1 配置

发布者我们使用 KafkaTemplate 来进行消息发布,所以需要先对其进行一些必要的配置。

@Configuration
@EnableKafka
public class KafkaConfig {/***** 发布者 *****///生产者工厂@Beanpublic ProducerFactory<Integer, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}//生产者配置@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.83:9092,192.168.2.84:9092,192.168.2.86:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}//生产者模板@Beanpublic KafkaTemplate<Integer, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}
}

2.1.2 构建发布者类

配置完发布者,下来就是发布消息,我们需要继承 ProducerListener<K, V> 接口,该接口完整信息如下:

public interface ProducerListener<K, V> {void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,Exception exception);}

实现该接口的方法,我们可以获取包含发送结果(成功或失败)的异步回调,也就是可以在这个接口的实现中获取发送结果。

我们简单的实现构建一个发布者类,接收主题和发布消息参数,并打印发布结果。

@Component
public class KafkaProducer implements ProducerListener<Object,Object> {private static final Logger producerlog = LoggerFactory.getLogger(KafkaProducer.class);private final KafkaTemplate<Integer, String> kafkaTemplate;public KafkaProducer(KafkaTemplate<Integer, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void producer (String msg,String topic){ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic,0, msg);future.addCallback(new KafkaSendCallback<Integer, String>() {@Overridepublic void onSuccess(SendResult<Integer, String> result) {producerlog.info("发送成功 {}", result);}@Overridepublic void onFailure(KafkaProducerException ex) {ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();producerlog.info("发送失败 {}",failed);}});}}

2.1.3 发布消息

写一个controller类来测试我们构建的发布者类,这个类中打印接收到的消息,来确保信息接收不出问题。

@RestController
public class KafkaTestController {private static final Logger kafkaTestLog = LoggerFactory.getLogger(KafkaTestController.class);@Resourceprivate KafkaProducer kafkaProducer;@GetMapping("/kafkaTest")public void kafkaTest(String msg,String topic){kafkaProducer.producer(msg,topic);kafkaTestLog.info("接收到消息 {} {}",msg,topic);}
}

一切准备就绪,我们启动程序利用postman来进行简单的测试。

进行消息发布:
在这里插入图片描述

发布结果:
在这里插入图片描述
可以看到消息发送成功。

我们再看看kafka消费者有没有接收到消息:

在这里插入图片描述

看以看到,kakfa的消费者也接收到了消息。

2.2 消费者

2.2.1 配置

消息的接受有多种方式,我们这里选择的是使用 @KafkaListener 注解来进行消息接收。它的使用像下面这样:

public class Listener {@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")public void listen(String data) {...}}

看起来不是太难吧,但使用这个注解,我们需要配置底层 ConcurrentMessageListenerContainer.kafkaListenerContainerFactor。

我们在原来的kafka配置类 KafkaConfig 中,继续配置消费者,大概就像下面这样

@Configuration
@EnableKafka
public class KafkaConfig {/***** 发布者 *****///生产者工厂@Beanpublic ProducerFactory<Integer, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}//生产者配置@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.83:9092,192.168.2.84:9092,192.168.2.86:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}//生产者模板@Beanpublic KafkaTemplate<Integer, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}/***** 消费者 *****///容器监听工厂@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}//消费者工厂@Beanpublic ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}//消费者配置@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.2.83:9092,192.168.2.84:9092,192.168.2.86:9092");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,3000);return props;}
}

注意,要设置容器属性必须使用getContainerProperties()工厂方法。它用作注入容器的实际属性的模板

2.2.2 构建消费者类

配置好后,我们就可以使用这个注解了。这个注解的使用有多种方式:

1、用它来覆盖容器工厂的concurrency和属性

@KafkaListener(id = "myListener", topics = "myTopic",autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {...
}

2、可以使用显式主题和分区(以及可选的初始偏移量)

@KafkaListener(id = "thing2", topicPartitions ={ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),@TopicPartition(topic = "topic2", partitions = "0",partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))})
public void listen(ConsumerRecord<?, ?> record) {...
}

3、将初始偏移应用于所有已分配的分区

@KafkaListener(id = "thing3", topicPartitions ={ @TopicPartition(topic = "topic1", partitions = { "0", "1" },partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))})
public void listen(ConsumerRecord<?, ?> record) {...
}

4、指定以逗号分隔的分区列表或分区范围

@KafkaListener(id = "pp", autoStartup = "false",topicPartitions = @TopicPartition(topic = "topic1",partitions = "0-5, 7, 10-15"))
public void process(String in) {...
}

5、可以向侦听器提供Acknowledgment

@KafkaListener(id = "cat", topics = "myTopic",containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {...ack.acknowledge();
}

6、添加标头

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,@Header(KafkaHeaders.OFFSET) List<Long> offsets) {...
}

我们这里写一个简单的,只用它来接受指定主题的数据:

@Component
public class KafkaConsumer {private static final Logger consumerlog = LoggerFactory.getLogger(KafkaConsumer.class);@KafkaListener(topicPartitions  = @TopicPartition(topic = "kafka-topic-test",partitions = "0"))public void consumer (String data){consumerlog.info("消费者接收数据 {}",data);}
}

这里解释一下,因为我们进行了手动分配主题/分区,所以 注解中group.id 可以为空。若要指定group.id请在消费者配置中加上props.put(ConsumerConfig.GROUP_ID_CONFIG, “bzt001”); 或在 @TopicPartition 注解后加上 groupId = “组id”

2.2.3 进行消息消费

继续使用postman调用我们写好的发布者发布消息,观察控制台的消费者类是否有相关日志出现。
在这里插入图片描述

http://www.hengruixuexiao.com/news/50317.html

相关文章:

  • 网站开发公司开发过程营销网站建站公司
  • 文化传媒公司网站模板制作网页代码大全
  • 住房及城乡建设部信息中心网站免费网站建设模板
  • 信息网站设计方案百度指数支持数据下载吗
  • 做原型交互的网站工具泉州百度竞价公司
  • 怎么做一个购物网站关键信息基础设施安全保护条例
  • 2017两学一做竞赛网站seo网站结构优化的方法
  • 网站开发项目实例小红书seo排名优化
  • 邯郸做网站哪家好网络销售怎么样
  • 网站开发游戏河北关键词seo排名
  • 网站推广的优点郑州seo培训班
  • 开个做网站的公司深圳龙岗区优化防控措施
  • 做类型网站二十条优化
  • 做网站代理属于开设赌场罪吗厦门seo怎么做
  • 长沙旅游网站开发企业推广宣传方式
  • 西安做视频网站公司搜索引擎排名优化包括哪些方面
  • 网站建设论文 网站建设论文网站建设公司网站
  • 岳阳仲裁委员会网站建设新增宁波怎么优化seo关键词
  • 正品查询网站怎么做茂名百度seo公司
  • 杭州网站界面设计长春网站建设方案优化
  • 做策划 都上什么网站购买seo关键词排名优化官网
  • 新浪企业邮箱seo综合排名优化
  • 网站搭建公司排行榜百度一下百度一下百度一下
  • 南昌网站推广公司seo排名优化推广教程
  • 北京企业建立网站重庆网页搜索排名提升
  • 模板王网站怎么下载不了模板免费建立网站
  • gis做图网站网络推广平台软件app
  • 公司网站的建设要注意什么软文推广发稿平台
  • 市场营销策划方案ppt西安seo按天收费
  • 字体不显示wordpress太原百度seo排名