朋也的博客 » 首页 » 文章

SpringCloud学习记录 - Stream

作者:朋也
日期:2021-01-20


版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证

Stream是啥?

Stream做的事跟hibernate一样,是个翻译器,hibernate适配不同的数据库,stream适配不同的队列

创建模块

创建一个stream-provider和两个 stream-consumer1, stream-consumer2 模块

stream-provider引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>-->

两个消费者引入的依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>-->

配置

stream-provider模块的配置

server.port=18087

spring.application.name=stream-provider

# rabbit 配置
spring.cloud.stream.binders.myMQ.type=rabbit
#spring.cloud.stream.binders.myMQ.environment.spring.rabbitmq.host=localhost
#spring.cloud.stream.binders.myMQ.environment.spring.rabbitmq.port=5672
#spring.cloud.stream.binders.myMQ.environment.spring.rabbitmq.username=guest
#spring.cloud.stream.binders.myMQ.environment.spring.rabbitmq.password=guest

# kafka 配置
#spring.cloud.stream.binders.myMQ.type=kafka
#spring.cloud.stream.kafka.binder.brokers=localhost
#spring.cloud.stream.kafka.binder.defaultBrokerPort=9092

spring.cloud.stream.bindings.output.destination=stream-provider-test
# 下面配置的myMQ是上面配置的binders
spring.cloud.stream.bindings.output.binder=myMQ

stream-consumer1模块的配置

server.port=18088

spring.application.name=stream-consumer1

spring.cloud.stream.binders.myMQ.type=rabbit
#spring.cloud.stream.binders.myMQ.environment.spring.rabbitmq.host=localhost
#spring.cloud.stream.binders.myMQ.environment.spring.rabbitmq.port=5672
#spring.cloud.stream.binders.myMQ.environment.spring.rabbitmq.username=guest
#spring.cloud.stream.binders.myMQ.environment.spring.rabbitmq.password=guest

#spring.cloud.stream.binders.myMQ.type=kafka
#spring.cloud.stream.kafka.binder.brokers=localhost
#spring.cloud.stream.kafka.binder.defaultBrokerPort=9092

spring.cloud.stream.bindings.input.destination=stream-provider-test
spring.cloud.stream.bindings.input.binder=myMQ
# 给消费者添加上组名,只要组名相同,消费消息的时候就不会重复
spring.cloud.stream.bindings.input.group=consumer-group-test

stream-consumer2模块的配置

server.port=18089

spring.application.name=stream-consumer1

spring.cloud.stream.binders.myMQ.type=rabbit
#spring.cloud.stream.binders.myMQ.environment.spring.rabbitmq.host=localhost
#spring.cloud.stream.binders.myMQ.environment.spring.rabbitmq.port=5672
#spring.cloud.stream.binders.myMQ.environment.spring.rabbitmq.username=guest
#spring.cloud.stream.binders.myMQ.environment.spring.rabbitmq.password=guest

#spring.cloud.stream.binders.myMQ.type=kafka
#spring.cloud.stream.kafka.binder.brokers=localhost
#spring.cloud.stream.kafka.binder.defaultBrokerPort=9092

spring.cloud.stream.bindings.input.destination=stream-provider-test
spring.cloud.stream.bindings.input.binder=myMQ
spring.cloud.stream.bindings.input.group=consumer-group-test

原链文接: https://tomoya92.github.io/2021/01/20/spring-cloud-stream/

提供者

消息提供者发送消息到队列中,使用 @EnableBinding 来开启消息功能

package com.example.springcloudtutorial;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@EnableBinding(Processor.class)
public class MessageProviderService {

    @Resource
    private MessageChannel output;

    public String send() {
        String uuid = UUID.randomUUID().toString();
        Map<String, Object> payload = new HashMap<>();
        payload.put("uuid", uuid);
        output.send(MessageBuilder.withPayload(payload).build());
        return uuid;
    }

}

写一个接口,调用一次发一条消息

package com.example.springcloudtutorial;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@SpringBootApplication
@RestController
public class StreamProviderApplication {

    @Resource
    private MessageProviderService messageProviderService;

    @GetMapping("/sendMsg")
    public ResponseEntity<String> sendMsg() {
        String uuid = messageProviderService.send();
        System.out.println("provider: " + uuid);
        return ResponseEntity.ok(uuid);
    }

    public static void main(String[] args) {
        SpringApplication.run(StreamProviderApplication.class, args);
    }
}

消费者

同样使用 @EnableBinding 开启消息功能,同时使用 @StreamListener 来监听消费哪的消息

StreamConsumer1Application.java

package com.example.springcloudtutorial;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.Message;

import java.util.Map;

@SpringBootApplication
@EnableBinding(Processor.class)
public class StreamConsumer1Application {

    @StreamListener(Processor.INPUT)
    public void receiveMsg(Message<Map<String, Object>> message) {
        Map<String, Object> payload = message.getPayload();
        System.out.println("consumer2: " + payload.toString());
    }

    public static void main(String[] args) {
        SpringApplication.run(StreamConsumer1Application.class, args);
    }
}

StreamConsumer2Application.java

package com.example.springcloudtutorial;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.Message;

import java.util.Map;

@SpringBootApplication
@EnableBinding(Processor.class)
public class StreamConsumer2Application {

    @StreamListener(Processor.INPUT)
    public void receiveMsg(Message<Map<String, Object>> message) {
        Map<String, Object> payload = message.getPayload();
        System.out.println("consumer2: " + payload.toString());
    }

    public static void main(String[] args) {
        SpringApplication.run(StreamConsumer2Application.class, args);
    }
}

总结

org.springframework.cloud.stream.annotation 这个包下的几个注解都标注了不推荐使用了,但官方文档上又没说明用哪个来代替