1.1 消息发送者stream-sender(发字符串)


<?xml version="1.0"?>
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.agan.springcloud</groupId>
<artifactId>config</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>stream-sender</artifactId>
<name>stream-sender</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
1.1.1.1 application.properties
spring.application.name=config-server
server.port=9040
eureka.client.serviceUrl.defaultZone=http://user:123456@eureka1:8761/eureka/,http://user:123456@eureka2:8761/eureka/
#默认是hostname 注册,改成IP 注册
eureka.instance.perferIpAddress=true
spring.rabbitmq.host=192.168.48.37
spring.rabbitmq.port=5672
spring.rabbitmq.username=agan
spring.rabbitmq.password=123456
启动类
package com.agan.book.stream;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
/\*\*
\* @author 阿甘 http://study.163.com/instructor/1016671292.htm
\* @version 1.0
\*/
@SpringBootApplication
@EnableEurekaClient
@EnableBinding({ISendService.class})
public class StreamSenderApplication {
public static void main(String[] args) {
SpringApplication.run(StreamSenderApplication.class, args);
}
}
1.1.1 业务层ISendService.java
package com.agan.book.stream;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.SubscribableChannel;
public interface ISendService {
@Output("agan-exchange")
SubscribableChannel send();
}
1.1.1 测试类StreamTests.java
package com.agan.test;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import com.agan.book.stream.ISendService;
import com.agan.book.stream.StreamSenderApplication;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = StreamSenderApplication.class)
public class StreamTests {
@Autowired
private ISendService send;
@Test
public void send() throws InterruptedException {
String msg="agan...............";
Message message=MessageBuilder.withPayload(msg.getBytes()).build();
this.send.send().send(message);
}
}
1.1 消息接收者stream-receiver

1.1.1 pom.xml
<?xml version="1.0"?>
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.agan.springcloud</groupId>
<artifactId>config</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>stream-sender</artifactId>
<name>stream-sender</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
1.1.1.1 application.properties
spring.application.name=stream-receiver
server.port=9041
eureka.client.serviceUrl.defaultZone=http://user:123456@eureka1:8761/eureka/,http://user:123456@eureka2:8761/eureka/
#默认是hostname 注册,改成IP 注册
eureka.instance.perferIpAddress=true
spring.rabbitmq.host=192.168.48.37
spring.rabbitmq.port=5672
spring.rabbitmq.username=agan
spring.rabbitmq.password=123456
启动类
package com.agan.book.stream;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
/\*\*
\* @author 阿甘 http://study.163.com/instructor/1016671292.htm
\* @version 1.0
\*/
@SpringBootApplication
@EnableEurekaClient
@EnableBinding({IReceiveService.class})
public class StreamReceiverApplication {
public static void main(String[] args) {
SpringApplication.run(StreamReceiverApplication.class, args);
}
}
1.1.1 业务层IReceiveService.java
package com.agan.book.stream;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface IReceiveService {
@Input("agan-exchange")
SubscribableChannel receive();
}
1.1.1 业务层实现类ReceiveService.java
package com.agan.book.stream;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;
@Service
@EnableBinding({IReceiveService.class})
public class ReceiveService {
@StreamListener("agan-exchange")
public void onReceive(byte[] msg){
System.out.println("receive:"+ new String(msg));
}
}
1.1.1 启动14.1和14.2项目


接收者绑定了队列

1.1 讲解14.1和14.2 以上两个project


1.1 问题--一个消息重复消费
复制接收者,启动俩个接收者(消费者)——搭建集群
启动发送者,会发现两个接收者都消费了该信息

有俩个临时队列
通过消息分组,解决临时队列和一个消息消费两次问题
1.1 消息分组stream-group-receiver

<?xml version="1.0"?>
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.agan.springcloud</groupId>
<artifactId>config</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>stream-group-receiver</artifactId>
<name>stream-group-receiver</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
#### 1.1.1.1 application.properties
spring.application.name=stream-group-receiver
server.port=9043
eureka.client.serviceUrl.defaultZone=http://user:123456@eureka1:8761/eureka/,http://user:123456@eureka2:8761/eureka/
#默认是hostname 注册,改成IP 注册
eureka.instance.perferIpAddress=true
spring.rabbitmq.host=rabbitmq.yun
spring.rabbitmq.port=5672
spring.rabbitmq.username=agan
spring.rabbitmq.password=123456
# 对应 MQ 是 exchange
spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct
# 具体分组 对应 MQ 是 队列名称 并且持久化队列
spring.cloud.stream.bindings.inputProduct.group=groupProduct
启动类
package com.agan.book.stream;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
/\*\*
\* @author 阿甘 http://study.163.com/instructor/1016671292.htm
\* @version 1.0
\*/
@SpringBootApplication
@EnableEurekaClient
@EnableBinding({IReceiveService.class})
public class StreamReceiverApplication {
public static void main(String[] args) {
SpringApplication.run(StreamReceiverApplication.class, args);
}
}
1.1.1 业务层IReceiveService.java
package com.agan.book.stream;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface IReceiveService {
String INPUT="inputProduct";
@Input(INPUT)
SubscribableChannel receive();
}
1.1.1 业务层ReceiveService.java
package com.agan.book.stream;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;
@Service
@EnableBinding({IReceiveService.class})
public class ReceiveService {
@StreamListener(IReceiveService.INPUT)
public void onReceive(Product obj){
System.out.println("receive:"+ obj.toString());
}
}
1.1 消息分组stream-group-sender

<?xml version="1.0"?>
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.agan.springcloud</groupId>
<artifactId>config</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>stream-group-sender</artifactId>
<name>stream-group-sender</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
spring.application.name=stream-group-sender
server.port=9044
eureka.client.serviceUrl.defaultZone=http://user:123456@eureka1:8761/eureka/,http://user:123456@eureka2:8761/eureka/
#默认是hostname 注册,改成IP 注册
eureka.instance.perferIpAddress=true
spring.rabbitmq.host=rabbitmq.yun
spring.rabbitmq.port=5672
spring.rabbitmq.username=agan
spring.rabbitmq.password=123456
# 对应 MQ 是 exchange
spring.cloud.stream.bindings.outputProduct.destination=exchangeProduct
package com.agan.book.stream;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
/\*\*
\* @author 阿甘 http://study.163.com/instructor/1016671292.htm
\* @version 1.0
\*/
@SpringBootApplication
@EnableEurekaClient
@EnableBinding({ISendService.class})
public class StreamSenderApplication {
public static void main(String[] args) {
SpringApplication.run(StreamSenderApplication.class, args);
}
}
业务层
package com.agan.book.stream;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.SubscribableChannel;
public interface ISendService {
String OUTPUT="outputProduct";
@Output(OUTPUT)
SubscribableChannel send();
}
test
package com.agan.test;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import com.agan.book.stream.ISendService;
import com.agan.book.stream.Product;
import com.agan.book.stream.StreamSenderApplication;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = StreamSenderApplication.class)
public class StreamTests {
@Autowired
private ISendService send;
@Test
public void send() throws InterruptedException {
Product obj=new Product();
obj.setId(100);
obj.setName("spring cloud");
for (int i = 0; i <10; i++) {
Message message=MessageBuilder.withPayload(obj).build();
this.send.send().send(message);
}
}
}
1.1.1 启动14.5和14.6


关闭接收者,这个队列仍然存在,证明持久化了
1.1.1 复制项目14.5,启动两个接收者搭建集群,启动发送者
一条消息只有一个接收者收到,这就是分组的用处。
1.1 消息分区--相同消息被同一个服务消费
就是消息分组后 再添加配置信息即可
效果如果没有分区,消息被平均分配到集群的各个节点上,
如果添加分区,消息只被分配到集群的同一个节点上。
第一个配置
spring.application.name=stream-group-receiver
server.port=9043
eureka.client.serviceUrl.defaultZone=http://user:123456@eureka1:8761/eureka/,http://user:123456@eureka2:8761/eureka/
#默认是hostname 注册,改成IP 注册
eureka.instance.perferIpAddress=true
spring.rabbitmq.host=rabbitmq.yun
spring.rabbitmq.port=5672
spring.rabbitmq.username=agan
spring.rabbitmq.password=123456
# 对应 MQ 是 exchange
spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct
# 具体分组 对应 MQ 是 队列名称 并且持久化队列
spring.cloud.stream.bindings.inputProduct.group=groupProduct
#开启消费者分区功能
spring.cloud.stream.bindings.inputProduct.consumer.partitioned=true
#指定了当前消费者的总实例数量
spring.cloud.stream.instanceCount=2
#设置当前实例的索引号,从0开始
spring.cloud.stream.instanceIndex=0
第二个配置
spring.application.name=stream-group-sender
server.port=9044
eureka.client.serviceUrl.defaultZone=http://user:123456@eureka1:8761/eureka/,http://user:123456@eureka2:8761/eureka/
#默认是hostname 注册,改成IP 注册
eureka.instance.perferIpAddress=true
spring.rabbitmq.host=rabbitmq.yun
spring.rabbitmq.port=5672
spring.rabbitmq.username=agan
spring.rabbitmq.password=123456
# 对应 MQ 是 exchange
spring.cloud.stream.bindings.outputProduct.destination=exchangeProduct
#通过该参数指定了分区键的表达式规则
spring.cloud.stream.bindings.outputProduct.producer.partitionKeyExpression=payload
#指定了消息分区的数量。
spring.cloud.stream.bindings.outputProduct.producer.partitionCount=2