JMS是什么
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。我们可以简单的理解:两个应用程序之间需要进行通信,我们使用一个JMS服务,进行中间的转发,通过JMS 的使用,我们可以解除两个程序之间的耦合。
JMS规范
JMS定义了Java 中访问消息中间件的接口,并没有给予实现,实现JMS接口的消息中间件(MOM)称为JMS Provider。例如:ActiveMQ、redis、rabbitMQ、activeMQ、zeroMQ、Kafka等。
Asynchronous(异步)
JMS 原本就是一个异步的消息服务,客户端获取消息的时候,不需要主动发送请求,消息会自动发送给可用的客户端。
Reliable(可靠)
JMS保证消息只会递送一次。大家都遇到过重复创建消息问题,而JMS能帮你避免该问题。
JMS的消息模型
在JMS API出现之前,大部分产品使用“点对点”和“发布/订阅”中的任一方式来进行消息通讯。JMS定义了这两种消息发送模型的规范,它们相互独立。任何JMS的提供者可以实现其中的一种或两种模型,这是它们自己的选择。JMS规范提供了通用接口保证我们基于JMS API编写的程序适用于任何一种模型。
Point To Point
在点对点通信模式中,应用程序由消息队列、发送方、接收方组成。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。模型如下图:
特点:
- 每个消息只有一个消费者。
- 发送者和接收者在时间上是没有时间的约束,也就是说发送者在发送完消息之后,不管接收者有没有接受消息,都不会影响发送方发送消息到消息队列中。
- 发送方不管是否在发送消息,接收方都可以从消息队列中去到消息。
- 接收方在接收完消息之后,需要向消息队列应答成功。
Publish/Subscribe
在发布/订阅消息模型中,发布者发布一个消息,该消息通过topic传递给所有的客户端。该模式下,发布者与订阅者都是匿名的,即发布者与订阅者都不知道对方是谁。并且可以动态的发布与订阅Topic。Topic主要用于传递消息。
特点:
- 一个消息可以传递个多个订阅者(即:一个消息可以有多个接受方)。
- 发布者与订阅者具有时间约束,针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。
- 为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
JMS的编程模型
首先解释一下JMS编程模型中的名词意思。
连接工厂(Connection Factories):创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。客户端使用一个连接工厂对象连接到JMS服务提供者,它创建了JMS服务提供者和客户端之间的连接。JMS客户端(如发送者或接受者)会在JNDI名字空间中搜索并获取该连接。使用该连接,客户端能够与目的地通讯,往队列或话题发送/接收消息。
目的地(Destination):目的地指明消息被发送的目的地以及客户端接收消息的来源。JMS使用两种目的地,队列和话题。
连接对象(Connections):Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。
连接对象封装了与JMS提供者之间的虚拟连接,如果我们有一个ConnectionFactory对象,可以使用它来创建一个连接。会话(Sessions):Session 是我们对消息进行操作的接口,可以通过session创建生产者、消费者、消息等。Session 提供了事务的功能,如果需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。
消息生产者(Message Producers):消息生产者由Session创建,用于往目的地发送消息。生产者实现MessageProducer接口,我们可以为目的地、队列或话题创建生产者;
消息消费者(Message Consumers):消息消费者由Session创建,用于接收被发送到Destination的消息。
消息监听者(Message Listeners):消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener。
模型关系:
activeMQ
ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。
特点:
- 支持多种语言编写客户端
- 对spring的支持,很容易和spring整合
- 支持多种传输协议:TCP,SSL,NIO,UDP等
- 支持AJAX
消息形式:
- 点对点(queue)
- 一对多(topic)
下载地址
下载地址:
http://activemq.apache.org/activemq-5156-release.html
去页面下载最新稳定版本即可
启动服务
下载完成后,解压到一个磁盘目录中去,然后打开命令行窗口,进入到安装目录的bin目录下,输入命令activemq start即可启动ActiveMQ服务。
打开浏览器,输入http://localhost:8161/admin/
账号密码均为admin,即可登录ActiveMQ的控制台页面。
Java中使用activeMQ
创建一个Maven工厂,在pom.xml中配置Jar包依赖:1
2
3
4
5
6
7
8<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.8</version>
</dependency>
</dependencies>
下列代码的注释已经写得很清楚了,过多的解释就不再写了!
Point To Point
点对点模式中:
消息生产者:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36public class Producter {
public static void main(String[] args) throws JMSException {
//1.创建连接工厂对象 tcp协议,端口号为61616
ConnectionFactory factory = new ActiveMQConnectionFactory("","","tcp://localhost:61616");
//2.创建连接工厂
Connection conn = factory.createConnection();
//3.启动连接
conn.start();
//4.创建会话
//第一个参数:是否开启事务 boolean
//第二个参数;设置签收模型
// Session.AUTO_ACKNOWLEDGE:客户端自动签收
// Session.CLIENT_ACKNOWLEDGE:客户端手动签收,必须要调用消息队像的acknowledge()方法才签收
// Session.DUPS_OK_ACKNOWLEDGE:签收不签收不所谓 有可能会有消息被重复消费
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建消息目的地 - 队列
Destination queue = session.createQueue("phones");
//6.创建消息的生成者对象
MessageProducer pro = session.createProducer(queue);
//7.发送消息
Message message = session.createTextMessage("15879171525");
pro.send(message);
//8.关闭连接
session.close();
conn.close();
}
}
消费者只是第6、7步和生产者不一样。
消费者:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35public class Producter {
public static void main(String[] args) throws JMSException {
//1.创建连接工厂对象 tcp协议,端口号为61616
ConnectionFactory factory = new ActiveMQConnectionFactory("","","tcp://localhost:61616");
//2.创建连接工厂
Connection conn = factory.createConnection();
//3.启动连接
conn.start();
//4.创建会话
//第一个参数:是否开启事务 boolean
//第二个参数;设置签收模型
// Session.AUTO_ACKNOWLEDGE:客户端自动签收
// Session.CLIENT_ACKNOWLEDGE:客户端手动签收,必须要调用消息队像的acknowledge()方法才签收
// Session.DUPS_OK_ACKNOWLEDGE:签收不签收不所谓 有可能会有消息被重复消费
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建消息目的地 - 队列
Destination queue = session.createQueue("phones");
// 6.创建消息的消费者对象
MessageConsumer cons = session.createConsumer(queue);
// 7.接收消息 阻塞的
TextMessage message = (TextMessage) cons.receive();
//8.关闭连接
session.close();
conn.close();
}
}
Publish/Subscribe
消息/订阅模式。客户端(订阅者)要开启服务,监听Topic,当有新的消息被写入,订阅者立即可以得到消息。
消息生产者:与点对点模式的生产者只有第5步不一样。1
2//5.创建消息目的地 - 队列
Destination queue = session.createQueue("phones");
改为:1
2// 5.创建消息目的地 - 队列
Destination queue = session.createTopic(topicName);
消息订阅者:与点对点模式的消费者也只有第5步不一样。1
2//5.创建消息目的地 - 队列
Destination queue = session.createQueue("phones");
改为:1
2// 5.创建消息目的地 - 队列
Destination queue = session.createTopic(topicName);
工具类
可以看到上面有很多重复的代码,我们可以将其封装一下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212public class ActiveMQUtil {
private static ConnectionFactory factory;
static {
try {
Properties prop = new Properties();
prop.load(ActiveMQUtil.class.getResourceAsStream("/activeMQ.properties"));
factory = new ActiveMQConnectionFactory(prop.getProperty("MQ.username"), prop.getProperty("MQ.password"),
prop.getProperty("MQ.url"));
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//点对单模式,消息生产者
public static void sendMessageToQueue(String phone, String queueName) {
// 2.创建连接工厂
Connection conn = null;
Session session = null;
try {
conn = factory.createConnection();
// 3.启动连接
conn.start();
// 4.创建会话
// 第一个参数:是否开启事务 boolean
// 第二个参数;设置签收模型
// Session.AUTO_ACKNOWLEDGE:客户端自动签收
// Session.CLIENT_ACKNOWLEDGE:客户端手动签收,必须要调用消息队像的acknowledge()方法才签收
// Session.DUPS_OK_ACKNOWLEDGE:签收不签收不所谓 有可能会有消息被重复消费
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建消息目的地 - 队列
Destination queue = session.createQueue(queueName);
// 6.创建消息的生成者对象
MessageProducer pro = session.createProducer(queue);
// 7.发送消息
Message message = session.createTextMessage(phone);
pro.send(message);
} catch (Exception e) {
// TODO: handle exception
} finally {
// 8.关闭连接
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try {
conn.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
//点对单模式消息接受者
public static String reviceMessageFormQueue(String queueName) {
// 2.创建连接工厂
Connection conn = null;
Session session = null;
String mes = null;
try {
conn = factory.createConnection();
// 3.启动连接
conn.start();
// 4.创建会话
// 第一个参数:是否开启事务 boolean
// 第二个参数;设置签收模型
// Session.AUTO_ACKNOWLEDGE:客户端自动签收
// Session.CLIENT_ACKNOWLEDGE:客户端手动签收,必须要调用消息队像的acknowledge()方法才签收
// Session.DUPS_OK_ACKNOWLEDGE:签收不签收不所谓 有可能会有消息被重复消费
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建消息目的地 - 队列
Destination queue = session.createQueue(queueName);
// 6.创建消息的消费者对象
MessageConsumer cons = session.createConsumer(queue);
// 7.接收消息 阻塞的
TextMessage message = (TextMessage) cons.receive();
mes = message.getText();
} catch (Exception e) {
// TODO: handle exception
} finally {
// 8.关闭连接
try {
if(session != null)
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try {
if(conn != null)
conn.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return mes;
}
//消息订阅模式 生产者
public static void sendMessageToTopic(String phone, String topicName) {
// 2.创建连接工厂
Connection conn = null;
Session session = null;
try {
conn = factory.createConnection();
// 3.启动连接
conn.start();
// 4.创建会话
// 第一个参数:是否开启事务 boolean
// 第二个参数;设置签收模型
// Session.AUTO_ACKNOWLEDGE:客户端自动签收
// Session.CLIENT_ACKNOWLEDGE:客户端手动签收,必须要调用消息队像的acknowledge()方法才签收
// Session.DUPS_OK_ACKNOWLEDGE:签收不签收不所谓 有可能会有消息被重复消费
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建消息目的地 - 队列
Destination queue = session.createTopic(topicName);
// 6.创建消息的生成者对象
MessageProducer pro = session.createProducer(queue);
// 7.发送消息
Message message = session.createTextMessage(phone);
pro.send(message);
} catch (Exception e) {
// TODO: handle exception
} finally {
// 8.关闭连接
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try {
conn.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
//消息订阅模式 订阅者
public static String reviceMessageFormTopic(String topicName) {
// 2.创建连接工厂
Connection conn = null;
Session session = null;
String mes = null;
try {
conn = factory.createConnection();
// 3.启动连接
conn.start();
// 4.创建会话
// 第一个参数:是否开启事务 boolean
// 第二个参数;设置签收模型
// Session.AUTO_ACKNOWLEDGE:客户端自动签收
// Session.CLIENT_ACKNOWLEDGE:客户端手动签收,必须要调用消息队像的acknowledge()方法才签收
// Session.DUPS_OK_ACKNOWLEDGE:签收不签收不所谓 有可能会有消息被重复消费
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建消息目的地 - 队列
Destination queue = session.createTopic(topicName);
// 6.创建消息的消费者对象
MessageConsumer cons = session.createConsumer(queue);
// 7.接收消息 阻塞的
TextMessage message = (TextMessage) cons.receive();
mes = message.getText();
} catch (Exception e) {
// TODO: handle exception
} finally {
// 8.关闭连接
try {
if(session != null)
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try {
if(conn != null)
conn.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return mes;
}
}