J2EE的异步消息机制

/ns/wz/otherwz/data/20020806022816.htm

转帖

  
  在分布式企业级应用程序中,异步消息机制用于有效地协调各个部分的工作。

  J2EE为我们提供了JMS和消息驱动豆(Message-Driven Bean),用来实现应用程序各个部件之间的异步消息传递。

  一.什么是消息系统?

  通常一个消息系统允许分开的未耦合的应用程序之间可靠地异步通信。在企业应用时,需要一种异步的,非阻塞的消息传递。比如,一个客户端可能希望给一个服务器发送一个请求后,不在乎是否马上能得到回应。这样,客户端没有理由必须等待服务器处理请求。客户端应用程序在递交一个请求之后,只需确保请求到达服务器端后,就可以处理其他任务。通常,这是很高效的。消息系统提供了许多其他分布式对象计算模型没有的优点。它鼓励在消息产生者和使用者之间的"松耦合",在它们之间有很高程度的事务处理。对于使用者,它不在乎谁产生了消息,产生者是否仍在网络上以及消息是什么时候产生的。这就允许建立动态的,可靠的和灵活的系统。整个的子系统能被修改而不会影响系统的其他部分。
  另外的优点包括:系统的高度可扩展性,容易与其他系统进行集成,以及高度的可靠性。由于可靠性和可扩展性,使得它们用于解决许多商业和科学计算问题。比如,消息系统是许多应用程序的基础,这些应用程序可以是工作流,网络管理,通信服务或供应链管理程序。在JAVA技术中,处理异步消息的能力是通过JMS来实现的。JMS最初设计是为了给传统的消息对象中间件提供一个标准的JAVA接口。而这些产品是在一个企业级应用程序中必须的。现在出现了许多支持JMS的纯JAVA的产品。

  消息系统类型

  通常有两种消息类型。

  1.发布/订阅(publish/subscribe)

  发布/订阅消息系统支持一个事件驱动模型,消息产生者和使用者都参与消息的传递。产生者发布事件,而使用者订阅感兴趣的事件,并使用事件。产生者将消息和一个特定的主题(Topic)连在一起,消息系统根据使用者注册的兴趣,将消息传给使用者。

  2.点对点(Peer to peer)

  在点对点的消息系统中,消息分发给一个单独的使用者。它维持一个"进入"消息队列。消息应用程序发送消息到一个特定的队列,而客户端从一个队列中得到消息。

  二.JMS简介

  JMS的目的是提供给消息系统客户一个固定的接口,而且与底层的消息提供者无关。这样,客户端的应用程序可以在不同的机器和操作系统中移植,而且能在不同的消息系统产品之间转移。JMS客户端都是建立在JAVA技术上的,从而也能使用其他JAVAAPI,如JDBC数据库连接,使用JAVABEAN组件模型,JDNI名字服务,JTA客户端事务处理控制以及J2SE和J2EE API来实现企业级应用服务程序。

  1.JMS对象模型


  图1显示了JMS对象,用于提供JMS客户端与JMS服务提供者相连的对象。

  ConnectionFactory是一个客户端用来创建一个Connection的管理对象。由于在Connection创建时有授权和通信建立过程,因此这个对象是比较大的。

  Destination对象将一个消息的目的和服务提供者有关的地址及配置信息包装起来。

  Session是JMS实体,用来支持事务处理和异步消息消费。JMS并不需要客户端的代码用于异步消息消费或能处理多个并发消息。通常,事务的复杂性都由一个Session来封装。

  一个Session是一个原子单位的工作,与数据库的事务一样,要实现多线程事务比较困难。Session提供了在一个线程编程模式下的并发的优点。

  MessageProducer和MessageConsumer对象由Session对象创建。用于发送和接受消息。为了确保消息的传递,JMS服务提供者处理的消息都要处于PERSISTENT模式。PERSISTENT模式使得JMS提供者出问题后,也能让消息保存下来。

  Session,MessageProducer和MessageConsumer都不支持并发,而ConnectionFactory,Destination和Connection都支持并发。

  2.JMS应用程序开发

  JMS中的消息

  在消息系统中,应用程序之间通信的关键是消息。因此使用JMS必须要先理解消息。

  在JMS中,消息由三部分组成:

  MESSAGE HEADER用于识别消息,比如用于判断一个给定的消息是否是一个"订阅者"

  PROPERITIES用于与应用程序相关的,提供者相关的和可选项的信息

  BODY是消息的内容,支持几种格式,包括TextMessage(对String一个简单的封装)和ObjectMessage(对任意对象的封装,但必须支持序列化),也支持其他格式。

  TextMessage

  一个TextMessage是一个String对象的封装。在只有文本对象传递时,是很有用的。它假设许多消息系统是建立在XML上的。从而TextMessage就可以成为包装它们的容器。

  创建一个TextMessage对象很简单,如下面的代码:

  TextMessage message=session.createMessage();

  message.setText("Hello, world!");

  ObjectMessage

  如名字所示,它是对一个JAVA对象的封装的消息。任何可序列化的JAVA对象都能用于ObjectMessage,如果必须将多个对象封装在一个消息里传递,可以使用Collection对象,来包括多个序列化对象。

  下面是创建一个ObjectMessage

  ObjectMessage message=session.createObjectMessage();

  message.setObject(myObject);

  创建一个JMS客户端程序

  一个典型的JMS客户端由下面的几个基本步骤来创建:

  创建一个到消息系统提供者的连接(Connection)

  创建一个Session,用于接收和发送消息

  创建MessageProducer和MessageConsumer来创建和接收消息

  当完成了上述步骤后,一个消息产生者客户端将创建并发布消息到一个主题,而消息使用者客户端会接收与一个主题相关的消息。

  1.创建一个Connection

  一个Connection提供客户端对底层的消息系统的访问。并实现资源的分配和管理。通过使用一个ConnectionFactory来创建一个Connection,通常用JDNI来指定:



Connection message=new initialContext();
TopicConnectionFactory topicConnectionFactory=(TopicConnectionFactory);
topic = (Topic) jndiContext.lookup(topicName);
topicConnection =topicConnectionFactory.createTopicConnection();

  2.创建一个Session

  Session是一个比较大的JMS对象,他提供了生产和消费消息的手段。用于创建消息使用者和消息产生者。

  topicSession = topicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);

  两个参数用于控制事务和消息确认。

  3.定位一个Topic

  用JDNI来定位一个Topic,Topic用于识别发送或接收的消息,在发布/订阅系统中。订阅者订阅一个给定的Topic,而发布者将它发布的消息与一个Topic相连。

  下面是创建一个Topic "WeatherReport"

  Topic weatherTopic=messaging.lookup("WeatherReport");

  4.启动Connection

  在上面的初始化步骤之后,消息流是禁止的,用于防止在初始化时发生不可预料的行为。一旦初始化结束,必须让Connection启动消息系统。

  topicConnection.start();

  5.创建一个消息产生者

  在发布/订阅里,一个产生者发布消息到一个指定的Topic。下面的代码显示创建一个产生者,以及后续的建立和发布一个简单文本消息。

  TopicPublisher publisher=session.createPublisher(weatherTopic);

  TexeMessage message=session.createMessage();

  message.setText("ssss");

  publisher.publish(message);

  下面是一个消息使用者的代码

topicConnection =topicConnectionFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topicSubscriber = topicSession.createSubscriber(topic);
topicListener = new MsgListener();
topicSubscriber.setMessageListener(topicListener);
topicConnection.start();