rabitmq

[复制链接]
发表于 2024-11-23 14:24:05 | 显示全部楼层 |阅读模式

1.1 如何安装rabbitMQ

系统版本:CentOS 6.5

RabbitMQ-Server:3.5.1

一、安装erlang

1.安装准备,下载安装文件

   wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm

   rpm -Uvh erlang-solutions-1.0-1.noarch.rpm


2.安装erlang

   yum install erlang



3.安装完成后可以用erl命令查看是否安装成功

   erl -version




二、安装RabbitMQ Server

1.安装准备,下载RabbitMQ Server

   wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.1/rabbitmq-server-3.5.1-1.noarch.rpm

2.安装RabbitMQ Server

   rpm --import http://www.rabbitmq.com/rabbitmq-signing-key-public.asc

   yum install rabbitmq-server-3.5.1-1.noarch.rpm



三、启动RabbitMQ

1.配置为守护进程随系统自动启动,root权限下执行:

   chkconfig rabbitmq-server on

2.启动rabbitMQ服务

   /sbin/service rabbitmq-server start



四、安装Web管理界面插件

1.安装命令

   rabbitmq-plugins enable rabbitmq\_management

2.安装成功后会显示如下内容

   The following plugins have been enabled:

     mochiweb

     webmachine

     rabbitmq\_web\_dispatch

     amqp\_client

     rabbitmq\_management\_agent

     rabbitmq\_management

   Plugin configuration has changed. Restart RabbitMQ for changes to take effect.

五、设置RabbitMQ远程ip登录

这里我们以创建个agan帐号,密码123456为例,创建一个账号并支持远程ip访问。

1.创建账号

   rabbitmqctl add\_user agan 123456

2.设置用户角色

   rabbitmqctl  set\_user\_tags  agan  administrator

3.设置用户权限

   rabbitmqctl set\_permissions -p "/" agan ".\*" ".\*" ".\*"

4.设置完成后可以查看当前用户和角色(需要开启服务)

   rabbitmqctl list\_users



浏览器输入:serverip:15672。其中serverip是RabbitMQ-Server所在主机的ip。

image.png

image.png

1.1 为什么安装rabbitMQ,它解决什么问题

同步变异步:

image.png

image.png

image.png

高内聚低耦合:解耦

image.png

流量削峰

image.png

1.1 RabbitMQ入门rabbit-mq-hello

image.png

1.1.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<groupId>com.agan.rabbit</groupId>

<artifactId>rabbit-mq-hello</artifactId>

<version>0.0.1-SNAPSHOT</version>

<packaging>jar</packaging>

<name>rabbit-mq-hello</name>

<description>Demo project for Spring Boot</description>

<parent>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-parent</artifactId>

<version>1.5.10.RELEASE</version>

<relativePath/> <!-- lookup parent from repository -->

</parent>

<properties>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<java.version>1.8</java.version>

</properties>

<dependencies>

<dependency>

<groupId>org.springframework.boot</groupId>

 <artifactId>spring-boot-starter-amqp</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

 <artifactId>spring-boot-starter-web</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

 <artifactId>spring-boot-starter-test</artifactId>

 <scope>test</scope>

</dependency>


</dependencies>

<build>

<plugins>

 <plugin>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-maven-plugin</artifactId>

 </plugin>

</plugins>


</build>

</project>

### 1.1.1 Resource:application.properties

spring.application.name=springboot-amqp

server.port=8080

spring.rabbitmq.host=192.168.48.37

spring.rabbitmq.port=5672

spring.rabbitmq.username=agan

spring.rabbitmq.password=123456


### 1.1.2 启动类

```package
import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

/\*\*

\* @author 阿甘

\* @see http://study.163.com/provider/1016671292/index.htm

\* @version 1.0

\*/

@SpringBootApplication

public class RabbitMqHelloApplication {

public static void main(String[] args) {

SpringApplication.run(RabbitMqHelloApplication.class, args);


}

}

1.1.3 接收者Receiver.java

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

/\*\*

\* @author 阿甘

\* @see http://study.163.com/provider/1016671292/index.htm

\* @version 1.0

\*/

@Component

public class Receiver {

@RabbitListener(queues="hello-agan-queue")

public void process(String msg){


System.out.println("receiver:"+msg);

}

}

1.1.4 发送者Sender.java

import java.util.Date;

import org.springframework.amqp.core.AmqpTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

/\*\*

\* @author 阿甘

\* @see http://study.163.com/provider/1016671292/index.htm

\* @version 1.0

\*/

@Component

public class Sender {

@Autowired

private AmqpTemplate rabbitTemplate;

public void send() throws InterruptedException{


String msg="hello"+new Date();

this.rabbitTemplate.convertAndSend("hello-agan-queue", msg);

}

}

1.1.5 发送者配置SenderConfig.java

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/\*\*

\* @author 阿甘

\* @see http://study.163.com/provider/1016671292/index.htm

\* @version 1.0

\*/

@Configuration

public class SenderConfig {

@Bean

public Queue aganqueue(){


return new Queue("hello-agan-queue");

}

}

1.1.6 测试类

import org.junit.Test;

import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.test.context.junit4.SpringRunner;

import com.agan.rabbit.RabbitMqHelloApplication;

import com.agan.rabbit.Sender;

@RunWith(SpringRunner.class)

@SpringBootTest(classes = RabbitMqHelloApplication.class)

public class RabbitMqHelloApplicationTests {

@Autowired

private Sender sender;

@Test

public void send() throws InterruptedException {

while (true) {


Thread.sleep(1000);

this.sender.send();



}

}

}

1.1 RabbitMQ通信为什么需要信道,而不是TCP

image.png

image.png

image.png

image.png

1.1 direct交换器

image.png

1.1.1 生产者rabbit-mq-direct-provider

1.1.1.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<groupId>com.agan.rabbit</groupId>

<artifactId>rabbit-mq-direct-provider</artifactId>

<version>0.0.1-SNAPSHOT</version>

<packaging>jar</packaging>

<name>rabbit-mq-direct-provider</name>

<description>Demo project for Spring Boot</description>

<parent>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-parent</artifactId>

<version>1.5.10.RELEASE</version>

<relativePath/> <!-- lookup parent from repository -->



</parent>

<properties>


<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<java.version>1.8</java.version>



</properties>

<dependencies>

<dependency>


<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-amqp</artifactId>



</dependency>

<dependency>


<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-web</artifactId>



</dependency>

<dependency>


<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-test</artifactId>

<scope>test</scope>

</dependency>

</dependencies>

<build>

<plugins>

<plugin>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-maven-plugin</artifactId>

</plugin>

</plugins>

</build>

</project>

1.1.1.2 Resource:application.properties

server.port=8080

spring.rabbitmq.host=192.168.48.37

spring.rabbitmq.port=5672

spring.rabbitmq.username=agan

spring.rabbitmq.password=123456

设置交换器

1.1.1.3 启动类

package com.agan.rabbit;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

/\*\*

\* @author 阿甘

\* @see http://study.163.com/provider/1016671292/index.htm

\* @version 1.0

\*/

@SpringBootApplication

public class RabbitMqHelloApplication {

public static void main(String[] args) {


SpringApplication.run(RabbitMqHelloApplication.class, args);

}

}

1.1.1.4 发送者Sender.java

import java.util.Date;

import org.springframework.amqp.core.AmqpTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.stereotype.Component;

/\*\*

\* @author 阿甘

\* @see http://study.163.com/provider/1016671292/index.htm

\* @version 1.0

\*/

@Component

public class Sender {

@Autowired

private AmqpTemplate rabbitTemplate;

@Value("\${mq.config.exchange}")

private String exchange;

public void send() throws InterruptedException{


String msg="hello"+new Date();

this.rabbitTemplate.convertAndSend(this.exchange,"log.error.routing.key", msg);

}

}

1.1.1.5 测试类

import org.junit.Test;

import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.test.context.junit4.SpringRunner;

import com.agan.rabbit.RabbitMqHelloApplication;

import com.agan.rabbit.Sender;

@RunWith(SpringRunner.class)

@SpringBootTest(classes = RabbitMqHelloApplication.class)

public class RabbitMqHelloApplicationTests {

@Autowired

private Sender sender;

@Test

public void send() throws InterruptedException {


this.sender.send();

}

}

1.1.1 消费者rabbit-mq-topic-consumer

1.1.1.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<groupId>com.agan.rabbit</groupId>

<artifactId>rabbit-mq-topic-consumer</artifactId>

<version>0.0.1-SNAPSHOT</version>

<packaging>jar</packaging>

<name>rabbit-mq-topic-consumer</name>

<description>Demo project for Spring Boot</description>

<parent>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-parent</artifactId>

<version>1.5.10.RELEASE</version>

<relativePath/> <!-- lookup parent from repository -->


</parent>

<properties>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<java.version>1.8</java.version>


</properties>

<dependencies>

<dependency>


<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-amqp</artifactId>



</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-web</artifactId>



</dependency>

<dependency>


<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-test</artifactId>

<scope>test</scope>

</dependency>

</dependencies>

<build>

<plugins>

<plugin>


<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-maven-plugin</artifactId>

</plugin>

</plugins>

</build>

</project>

1.1.1.2 Resource:application

server.port=8080

spring.rabbitmq.host=192.168.48.37

spring.rabbitmq.port=5672

spring.rabbitmq.username=agan

spring.rabbitmq.password=123456

~~​#​~~~~设置交换器~~

~~mq.config.exchange=log.direct~~

~~ ~~

~~mq.config.queue.info=log.info~~

~~mq.config.queue.info.routing.key=log.info.routing.key~~

~~ ~~

~~mq.config.queue.error=log.error~~

~~mq.config.queue.error.routing.key=log.error.routing.key~~

mq.config.exchange=log.topic

mq.config.queue.info=log.info

mq.config.queue.error=log.error

mq.config.queue.logs=log.msg

1.1.1.3 启动类

package com.agan.rabbit;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

/\*\*

\* @author 阿甘

\* @see http://study.163.com/instructor/1016671292.htm

\* @version 1.0

\*/

@SpringBootApplication

public class RabbitMqHelloApplication {

public static void main(String[] args) {


SpringApplication.run(RabbitMqHelloApplication.class, args);

}

}

1.1.1.4 错误接收者ErrroReceiver

import org.springframework.amqp.core.ExchangeTypes;

import org.springframework.amqp.rabbit.annotation.Exchange;

import org.springframework.amqp.rabbit.annotation.Queue;

import org.springframework.amqp.rabbit.annotation.QueueBinding;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

/\*\*

\* @author 阿甘

\* @see http://study.163.com/provider/1016671292/index.htm

\* @version 1.0

\*/

@Component

@RabbitListener(bindings=@QueueBinding(


value= @Queue(value="\${mq.config.queue.error}",autoDelete="true"),

exchange=@Exchange(value="\${mq.config.exchange}",type=ExchangeTypes.TOPIC),

key="\*.log.error"

)

)

public class ErrroReceiver {

@RabbitHandler

public void process(String msg){


System.out.println("---------error----------日志:"+msg);

}

}

1.1.1.5 日志接收者InfoReceiver

import org.springframework.amqp.core.ExchangeTypes;

import org.springframework.amqp.rabbit.annotation.Exchange;

import org.springframework.amqp.rabbit.annotation.Queue;

import org.springframework.amqp.rabbit.annotation.QueueBinding;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

/\*\*

\* @author 阿甘

\* @see http://study.163.com/provider/1016671292/index.htm

\* @version 1.0

\*/

@Component

@RabbitListener(bindings=@QueueBinding(


value= @Queue(value="\${mq.config.queue.info}",autoDelete="true"),

exchange=@Exchange(value="\${mq.config.exchange}",type=ExchangeTypes.TOPIC),

key="\*.log.info"

)

)

public class InfoReceiver {

@RabbitHandler

public void process(String msg){


System.out.println("------------------info日志:"+msg);

}

}

1.1.1.6 所有日志接收者LogsReceiver

import org.springframework.amqp.core.ExchangeTypes;

import org.springframework.amqp.rabbit.annotation.Exchange;

import org.springframework.amqp.rabbit.annotation.Queue;

import org.springframework.amqp.rabbit.annotation.QueueBinding;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

/\*\*

\* @author 阿甘

\* @see http://study.163.com/provider/1016671292/index.htm

\* @version 1.0

\*/

@Component

@RabbitListener(bindings=@QueueBinding(


value= @Queue(value="\${mq.config.queue.logs}",autoDelete="true"),

exchange=@Exchange(value="\${mq.config.exchange}",type=ExchangeTypes.TOPIC),

key="\*.log.\*"

)

)

public class LogsReceiver {

@RabbitHandler

public void process(String msg){


System.out.println("all---------------日志:"+msg);

}

}

1.1 fanout交换器

image.png

1.1.1 生产者rabbit-mq-fanout-provider

1.1.1.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<groupId>com.agan.rabbit</groupId>

<artifactId>rabbit-mq-fanout-provider</artifactId>

<version>0.0.1-SNAPSHOT</version>

<packaging>jar</packaging>

<name>rabbit-mq-fanout-provider</name>

<description>Demo project for Spring Boot</description>

<parent>


<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-parent</artifactId>

<version>1.5.10.RELEASE</version>

<relativePath/> <!-- lookup parent from repository -->



</parent>

<properties>


<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<java.version>1.8</java.version>



</properties>

<dependencies>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-amqp</artifactId>



</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-web</artifactId>



</dependency>

<dependency>


<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-test</artifactId>

<scope>test</scope>

</dependency>

</dependencies>

<build>

<plugins>

<plugin>


<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-maven-plugin</artifactId>

</plugin>

</plugins>

</build>

</project>

1.1.1.2 Resource:application.properties

spring.application.name=springboot-amqp
server.port=8080

spring.rabbitmq.host=192.168.48.37

spring.rabbitmq.port=5672

spring.rabbitmq.username=agan

spring.rabbitmq.password=123456

#设置交换器

mq.config.exchange=log.fanout

1.1.1.3 启动类

package
import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

/\*\*

\* @author 阿甘

\* @see http://study.163.com/provider/1016671292/index.htm

\* @version 1.0

\*/

@SpringBootApplication

public class RabbitMqHelloApplication {

public static void main(String[] args) {


SpringApplication.run(RabbitMqHelloApplication.class, args);

}

}

1.1.1.4 发送者Sender.java

package
import java.util.Date;

import org.springframework.amqp.core.AmqpTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.stereotype.Component;

/\*\*

\* @author 阿甘

\* @see http://study.163.com/provider/1016671292/index.htm

\* @version 1.0

\*/

@Component

public class Sender {

@Autowired

private AmqpTemplate rabbitTemplate;

@Value("\${mq.config.exchange}")

private String exchange;

public void send() throws InterruptedException{


String msg="hello"+new Date();

this.rabbitTemplate.convertAndSend(this.exchange,"", msg);

}

}

1.1.1.5 测试类

package
import org.junit.Test;

import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.test.context.junit4.SpringRunner;

import com.agan.rabbit.RabbitMqHelloApplication;

import com.agan.rabbit.Sender;

@RunWith(SpringRunner.class)

@SpringBootTest(classes = RabbitMqHelloApplication.class)

public class RabbitMqHelloApplicationTests {

@Autowired

private Sender sender;

@Test

public void send() throws InterruptedException {


this.sender.send();

}

}

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×

GMT+8, 2025-9-8 02:20 , Processed in 0.102211 second(s), 35 queries Archiver|手机版|小黑屋|Attic ( 京ICP备2020048627号 )

快速回复 返回顶部 返回列表