消息驱动stream

[复制链接]
发表于 2024-11-24 00:21:01 | 显示全部楼层 |阅读模式

1.1 消息发送者stream-sender(发字符串)

image.png

image.png

<?xml version="1.0"?>

<project

   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"

   xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">

   <modelVersion>4.0.0</modelVersion>

   <parent>

      <groupId>com.agan.springcloud</groupId>

      <artifactId>config</artifactId>

      <version>0.0.1-SNAPSHOT</version>

   </parent>

   <artifactId>stream-sender</artifactId>

   <name>stream-sender</name>

   <url>http://maven.apache.org</url>

   <properties>

      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

   </properties>

   <dependencies>

      <dependency>

         <groupId>org.springframework.boot</groupId>

         <artifactId>spring-boot-starter-web</artifactId>

      </dependency>


      <dependency>

         <groupId>org.springframework.cloud</groupId>

         <artifactId>spring-cloud-starter-stream-rabbit</artifactId>

      </dependency>

      <dependency>

         <groupId>org.springframework.boot</groupId>

         <artifactId>spring-boot-starter-test</artifactId>

         <scope>test</scope>

      </dependency>

   </dependencies>



   <build>

      <plugins>

         <plugin>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-maven-plugin</artifactId>

         </plugin>

      </plugins>

   </build>

</project>

1.1.1.1 application.properties

spring.application.name=config-server

server.port=9040



eureka.client.serviceUrl.defaultZone=http://user:123456@eureka1:8761/eureka/,http://user:123456@eureka2:8761/eureka/


#默认是hostname 注册,改成IP 注册

eureka.instance.perferIpAddress=true




spring.rabbitmq.host=192.168.48.37

spring.rabbitmq.port=5672

spring.rabbitmq.username=agan

spring.rabbitmq.password=123456

启动类

package com.agan.book.stream;


import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.cloud.netflix.eureka.EnableEurekaClient;

import org.springframework.cloud.stream.annotation.EnableBinding;


/\*\*

 \* @author 阿甘 http://study.163.com/instructor/1016671292.htm

 \* @version 1.0

 \*/


@SpringBootApplication

@EnableEurekaClient

@EnableBinding({ISendService.class})

public class StreamSenderApplication {


   public static void main(String[] args) {

      SpringApplication.run(StreamSenderApplication.class, args);

   }

}

1.1.1 业务层ISendService.java

package com.agan.book.stream;


import org.springframework.cloud.stream.annotation.Output;

import org.springframework.messaging.SubscribableChannel;


public interface ISendService {



   @Output("agan-exchange")

   SubscribableChannel send();

}

1.1.1 测试类StreamTests.java

package com.agan.test;


import org.junit.Test;

import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.messaging.Message;

import org.springframework.messaging.support.MessageBuilder;

import org.springframework.test.context.junit4.SpringRunner;


import com.agan.book.stream.ISendService;

import com.agan.book.stream.StreamSenderApplication;


@RunWith(SpringRunner.class)

@SpringBootTest(classes = StreamSenderApplication.class)

public class StreamTests {


   @Autowired

   private ISendService send;


   @Test

   public void send() throws InterruptedException {

      String msg="agan...............";

      Message message=MessageBuilder.withPayload(msg.getBytes()).build();

      this.send.send().send(message);

   }


}

1.1 消息接收者stream-receiver

image.png

1.1.1 pom.xml

<?xml version="1.0"?>

<project

   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"

   xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">

   <modelVersion>4.0.0</modelVersion>

   <parent>

      <groupId>com.agan.springcloud</groupId>

      <artifactId>config</artifactId>

      <version>0.0.1-SNAPSHOT</version>

   </parent>

   <artifactId>stream-sender</artifactId>

   <name>stream-sender</name>

   <url>http://maven.apache.org</url>

   <properties>

      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

   </properties>

   <dependencies>

      <dependency>

         <groupId>org.springframework.boot</groupId>

         <artifactId>spring-boot-starter-web</artifactId>

      </dependency>


      <dependency>

         <groupId>org.springframework.cloud</groupId>

         <artifactId>spring-cloud-starter-stream-rabbit</artifactId>

      </dependency>

      <dependency>

         <groupId>org.springframework.boot</groupId>

         <artifactId>spring-boot-starter-test</artifactId>

         <scope>test</scope>

      </dependency>

   </dependencies>



   <build>

      <plugins>

         <plugin>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-maven-plugin</artifactId>

         </plugin>

      </plugins>

   </build>

</project>

1.1.1.1 application.properties

spring.application.name=stream-receiver

server.port=9041



eureka.client.serviceUrl.defaultZone=http://user:123456@eureka1:8761/eureka/,http://user:123456@eureka2:8761/eureka/


#默认是hostname 注册,改成IP 注册

eureka.instance.perferIpAddress=true




spring.rabbitmq.host=192.168.48.37

spring.rabbitmq.port=5672

spring.rabbitmq.username=agan

spring.rabbitmq.password=123456

启动类

package com.agan.book.stream;


import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.cloud.netflix.eureka.EnableEurekaClient;

import org.springframework.cloud.stream.annotation.EnableBinding;


/\*\*

 \* @author 阿甘 http://study.163.com/instructor/1016671292.htm

 \* @version 1.0

 \*/


@SpringBootApplication

@EnableEurekaClient

@EnableBinding({IReceiveService.class})

public class StreamReceiverApplication {


   public static void main(String[] args) {

      SpringApplication.run(StreamReceiverApplication.class, args);

   }

}

1.1.1 业务层IReceiveService.java

package com.agan.book.stream;


import org.springframework.cloud.stream.annotation.Input;

import org.springframework.messaging.SubscribableChannel;


public interface IReceiveService {



   @Input("agan-exchange")

   SubscribableChannel receive();

}

1.1.1 业务层实现类ReceiveService.java

package com.agan.book.stream;


import org.springframework.cloud.stream.annotation.EnableBinding;

import org.springframework.cloud.stream.annotation.Input;

import org.springframework.cloud.stream.annotation.StreamListener;

import org.springframework.stereotype.Service;


@Service

@EnableBinding({IReceiveService.class})

public class ReceiveService {


   @StreamListener("agan-exchange")

   public void onReceive(byte[] msg){

      System.out.println("receive:"+ new String(msg));

   }

}

1.1.1 启动14.1和14.2项目

image.png

image.png

接收者绑定了队列

image.png

1.1 讲解14.1和14.2 以上两个project

image.png

image.png

1.1 问题--一个消息重复消费

复制接收者,启动俩个接收者(消费者)——搭建集群

启动发送者,会发现两个接收者都消费了该信息

image.png

有俩个临时队列

通过消息分组,解决临时队列和一个消息消费两次问题

1.1 消息分组stream-group-receiver

image.png

<?xml version="1.0"?>

<project

   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"

   xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">

   <modelVersion>4.0.0</modelVersion>

   <parent>

      <groupId>com.agan.springcloud</groupId>

      <artifactId>config</artifactId>

      <version>0.0.1-SNAPSHOT</version>

   </parent>

   <artifactId>stream-group-receiver</artifactId>

   <name>stream-group-receiver</name>

   <url>http://maven.apache.org</url>

   <properties>

      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

   </properties>

   <dependencies>

      <dependency>

         <groupId>org.springframework.boot</groupId>

         <artifactId>spring-boot-starter-web</artifactId>

      </dependency>


      <dependency>

         <groupId>org.springframework.cloud</groupId>

         <artifactId>spring-cloud-starter-stream-rabbit</artifactId>

      </dependency>

      <dependency>

         <groupId>org.springframework.boot</groupId>

         <artifactId>spring-boot-starter-test</artifactId>

         <scope>test</scope>

      </dependency>

   </dependencies>



   <build>

      <plugins>

         <plugin>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-maven-plugin</artifactId>

         </plugin>

      </plugins>

   </build>

</project>
#### 1.1.1.1     application.properties
spring.application.name=stream-group-receiver

server.port=9043



eureka.client.serviceUrl.defaultZone=http://user:123456@eureka1:8761/eureka/,http://user:123456@eureka2:8761/eureka/


#默认是hostname 注册,改成IP 注册

eureka.instance.perferIpAddress=true




spring.rabbitmq.host=rabbitmq.yun

spring.rabbitmq.port=5672

spring.rabbitmq.username=agan

spring.rabbitmq.password=123456



# 对应 MQ 是 exchange

spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct

# 具体分组 对应 MQ 是 队列名称 并且持久化队列

spring.cloud.stream.bindings.inputProduct.group=groupProduct

启动类

package com.agan.book.stream;


import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.cloud.netflix.eureka.EnableEurekaClient;

import org.springframework.cloud.stream.annotation.EnableBinding;


/\*\*

 \* @author 阿甘 http://study.163.com/instructor/1016671292.htm

 \* @version 1.0

 \*/


@SpringBootApplication

@EnableEurekaClient

@EnableBinding({IReceiveService.class})

public class StreamReceiverApplication {


   public static void main(String[] args) {

      SpringApplication.run(StreamReceiverApplication.class, args);

   }

}

1.1.1 业务层IReceiveService.java

package com.agan.book.stream;


import org.springframework.cloud.stream.annotation.Input;

import org.springframework.messaging.SubscribableChannel;


public interface IReceiveService {



   String INPUT="inputProduct";



   @Input(INPUT)

   SubscribableChannel receive();

}

1.1.1 业务层ReceiveService.java

package com.agan.book.stream;

import org.springframework.cloud.stream.annotation.EnableBinding;

import org.springframework.cloud.stream.annotation.Input;

import org.springframework.cloud.stream.annotation.StreamListener;

import org.springframework.stereotype.Service;

@Service

@EnableBinding({IReceiveService.class})

public class ReceiveService {

@StreamListener(IReceiveService.INPUT)

public void onReceive(Product obj){

System.out.println("receive:"+ obj.toString());



}

}

1.1 消息分组stream-group-sender

image.png

<?xml version="1.0"?>

<project

   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"

   xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">

   <modelVersion>4.0.0</modelVersion>

   <parent>

      <groupId>com.agan.springcloud</groupId>

      <artifactId>config</artifactId>

      <version>0.0.1-SNAPSHOT</version>

   </parent>

   <artifactId>stream-group-sender</artifactId>

   <name>stream-group-sender</name>

   <url>http://maven.apache.org</url>

   <properties>

      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

   </properties>

   <dependencies>

      <dependency>

         <groupId>org.springframework.boot</groupId>

         <artifactId>spring-boot-starter-web</artifactId>

      </dependency>


      <dependency>

         <groupId>org.springframework.cloud</groupId>

         <artifactId>spring-cloud-starter-stream-rabbit</artifactId>

      </dependency>

      <dependency>

         <groupId>org.springframework.boot</groupId>

         <artifactId>spring-boot-starter-test</artifactId>

         <scope>test</scope>

      </dependency>

   </dependencies>



   <build>

      <plugins>

         <plugin>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-maven-plugin</artifactId>

         </plugin>

      </plugins>

   </build>

</project>
spring.application.name=stream-group-sender

server.port=9044



eureka.client.serviceUrl.defaultZone=http://user:123456@eureka1:8761/eureka/,http://user:123456@eureka2:8761/eureka/


#默认是hostname 注册,改成IP 注册

eureka.instance.perferIpAddress=true




spring.rabbitmq.host=rabbitmq.yun

spring.rabbitmq.port=5672

spring.rabbitmq.username=agan

spring.rabbitmq.password=123456




# 对应 MQ 是 exchange

spring.cloud.stream.bindings.outputProduct.destination=exchangeProduct
package com.agan.book.stream;


import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.cloud.netflix.eureka.EnableEurekaClient;

import org.springframework.cloud.stream.annotation.EnableBinding;


/\*\*

 \* @author 阿甘 http://study.163.com/instructor/1016671292.htm

 \* @version 1.0

 \*/


@SpringBootApplication

@EnableEurekaClient

@EnableBinding({ISendService.class})

public class StreamSenderApplication {


   public static void main(String[] args) {

      SpringApplication.run(StreamSenderApplication.class, args);

   }

}

业务层

package com.agan.book.stream;


import org.springframework.cloud.stream.annotation.Output;

import org.springframework.messaging.SubscribableChannel;


public interface ISendService {



   String OUTPUT="outputProduct";



   @Output(OUTPUT)

   SubscribableChannel send();


}

test

package com.agan.test;


import org.junit.Test;

import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.messaging.Message;

import org.springframework.messaging.support.MessageBuilder;

import org.springframework.test.context.junit4.SpringRunner;


import com.agan.book.stream.ISendService;

import com.agan.book.stream.Product;

import com.agan.book.stream.StreamSenderApplication;


@RunWith(SpringRunner.class)

@SpringBootTest(classes = StreamSenderApplication.class)

public class StreamTests {


   @Autowired

   private ISendService send;


   @Test

   public void send() throws InterruptedException {

      Product obj=new Product();

      obj.setId(100);

      obj.setName("spring cloud");

      for (int i = 0; i <10; i++) {

         Message message=MessageBuilder.withPayload(obj).build();

         this.send.send().send(message);

      }

   }


}

1.1.1 启动14.5和14.6

image.png

image.png

关闭接收者,这个队列仍然存在,证明持久化了

1.1.1 复制项目14.5,启动两个接收者搭建集群,启动发送者

一条消息只有一个接收者收到,这就是分组的用处。

1.1 消息分区--相同消息被同一个服务消费

就是消息分组后 再添加配置信息即可

效果如果没有分区,消息被平均分配到集群的各个节点上,

如果添加分区,消息只被分配到集群的同一个节点上。

第一个配置

spring.application.name=stream-group-receiver

server.port=9043



eureka.client.serviceUrl.defaultZone=http://user:123456@eureka1:8761/eureka/,http://user:123456@eureka2:8761/eureka/


#默认是hostname 注册,改成IP 注册

eureka.instance.perferIpAddress=true




spring.rabbitmq.host=rabbitmq.yun

spring.rabbitmq.port=5672

spring.rabbitmq.username=agan

spring.rabbitmq.password=123456



# 对应 MQ 是 exchange

spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct

# 具体分组 对应 MQ 是 队列名称 并且持久化队列

spring.cloud.stream.bindings.inputProduct.group=groupProduct


#开启消费者分区功能

spring.cloud.stream.bindings.inputProduct.consumer.partitioned=true

#指定了当前消费者的总实例数量

spring.cloud.stream.instanceCount=2

#设置当前实例的索引号,从0开始

spring.cloud.stream.instanceIndex=0

第二个配置

spring.application.name=stream-group-sender

server.port=9044



eureka.client.serviceUrl.defaultZone=http://user:123456@eureka1:8761/eureka/,http://user:123456@eureka2:8761/eureka/


#默认是hostname 注册,改成IP 注册

eureka.instance.perferIpAddress=true




spring.rabbitmq.host=rabbitmq.yun

spring.rabbitmq.port=5672

spring.rabbitmq.username=agan

spring.rabbitmq.password=123456




# 对应 MQ 是 exchange

spring.cloud.stream.bindings.outputProduct.destination=exchangeProduct




#通过该参数指定了分区键的表达式规则

spring.cloud.stream.bindings.outputProduct.producer.partitionKeyExpression=payload

#指定了消息分区的数量。

spring.cloud.stream.bindings.outputProduct.producer.partitionCount=2

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×

GMT+8, 2025-9-8 02:22 , Processed in 0.098166 second(s), 35 queries Archiver|手机版|小黑屋|Attic ( 京ICP备2020048627号 )

快速回复 返回顶部 返回列表