MetaQ使用范例

MetaQ下载地址:http://fnil.net/downloads/index.html

ZooKeeper下载地址:http://www.apache.org/dyn/closer.cgi/zookeeper/

入门学习地址:https://github.com/killme2008/Metamorphosis/wiki

消息中间件

    为了考虑web架构的伸缩性,扩展性及重用性,目前许多大型门户网站及大平台,如淘宝网,天猫网,京东商城,当当网,及腾讯,Facebook等电商或社交网站,均大量采用中间件的设计,中间件又细分为业务流中间件,服务中间件,消息队列中间件,缓存中间件,数据库中间件,可以这样说,中间件在整个web架构设计中占有十分重要的地位,中间件设计的好坏直接影响到大型门户网站架构水平的高低和优劣。由于时间和精力的关系,这里阿堂主要是分享下对消息队列中间件的应用认识和理解。

    消息中间件在web分布式架构设计及性能优化方面有着非常重要的地位,目前,在很多大型网门户网站和商业大平台都有及为广泛的应用,如淘宝网,天猫网,京东商城,当当网,及腾讯,Facebook等电商或社交网站,都在大量使用。常见的开源消息中间件有mom4j,OpenJMS,UBerMQ,Hermes JMS, Presumo,JORAM,JMS4Spread,Open Message Queue,FFMQ,MQSSave/MQSLoad,HornetQ,Apache Qpid,Spring AMQP,Kafka,play-rabbitmq,队列消息系统 FQueue,ActiveMQ,Somnifugi ,MantaRay,MetaQ等

消息队列的相关概念

   为了了解MetaQ的使用,下面让我们先要了解和理解其中的一些重要概念解释。
   消息生产者
        也称为Message Producer,一般简称为producer,负责产生消息并发送消息到meta服务器。
   消息消费者
        也称为Message Consumer,一般简称为consumer,负责消息的消费,meta采用pull模型,由消费者主动从meta服务器拉取数据并解析成消息并消费。
    Topic
         消息的主题,由用户定义并在服务端配置。producer发送消息到某个topic下,consumer从某个topic下  消费消息。
    分区(partition)
         同一个topic下面还分为多个分区,如meta-test这个topic我们可以分为10个分区,分别有两台服务器提供,那么可能每台服务器提供5个分区,假设服务器id分别为0和1,则所有分区为0-0、0-1、0-2、0-3、0-4、1-0、1-1、1-2、1-3、1-4。
分区跟消费者的负载均衡机制有很大关系,具体见集群和负载均衡。
      Message
           消息,负载用户数据并在生产者、服务端和消费者之间传输。
      Broker
就是meta的服务端或者说服务器,在消息中间件中也通常称为broker。
      消费者分组(Group)
消费者可以是多个消费者共同消费一个topic下的消息,每个消费者消费部分消息。这些消费者就组成一个分组,拥有同一个分组名称,通常也称为消费者集群
      Offset
      消息在broker上的每个分区都是组织成一个文件列表,消费者拉取数据需要知道数据在文件中的偏移量,这个偏移量就是所谓offset。Offset是绝对偏移量,服务器会将offset转化为具体文件的相对偏移量。详细内容参见#消息的存储结构

MetaQ简介

    MetaQ(全称Metamorphosis)是一个高性能、高可用、可扩展的分布式消息中间件,思路起源于LinkedIn的Kafka,但并不是Kafka的一个Copy。MetaQ具有消息存储顺序写、吞吐量大和支持本地和XA事务等特性,适用于大吞吐量、顺序消息、广播和日志数据传输等场景,目前在淘宝和支付宝有着广泛的应用。
MetaQ是一款完全的队列模型消息中间件,服务器使用Java语言编写,可在多种软硬件平台上部署。客户端支持Java、C++编程语言。单台服务器可支持1万以上个消息队列,通过扩容服务器,队列数几乎可任意横向扩展。每个队列都是持久化、长度无限(取决于磁盘空间大小)、并且可从队列任意位置开始消费。
     它具有的优势如下
     (1)文本协议设计,非常透明,支持类似memcached stats的协议来监控broker。
     (2)纯Java实现,从通讯到存储,从client到server都是重新实现。
     (3)提供事务支持,包括本地事务和XA分布式事务。
     (4)支持HA复制,包括异步复制和同步复制,保证消息的可靠性。
     (5)支持异步发送消息。
     (6)消费消息失败,支持本地恢复。
     (7)多种offset存储支持,数据库、磁盘、zookeeper,可自定义实现。
     (8)支持group commit,提升数据可靠性和吞吐量。(目前kafka已实现)
     (9)支持消息广播模式。
     (10)一系列配套项目:Python/Ruby/C/C++客户端、Twitter Storm的Spout、Tail4j等。

关于消息队列应用的场景,实际上在商业应用中很多网站都在大量应用。阿堂这里随更举几个例子,大家就明白了。12306铁路购票网站,想必大家耳熟能详,去年及以前由于12306网站架构设计不好,经常出现网站并发访问量大时出现网站崩溃,提示服务出错或者说网站根本登录不进去,或者说抢到票时提交没有反应了。今年12306抢票网站就采用“消息队列中间件”来重构了12306网站,采用消息队列的排队机制,抢票时会提示“只有多少张票,目前已经有多少人在排队抢购了,票源不足”,比如说某车次只有10张票了,你前面已经有11人排队抢票了,当然输到你时就没有票了。这样一方面可以缓冲瞬时大量的高并发访问购票,二来对用户来说体验也比较好,因为人家是在你前面排队了,先来先得嘛。还有小米官网抢购手机时,也是用的这种“消息队列中间件”设计,先抢先得。还有一些电商平台的“秒杀抢购”,都是在使用“消息队列中间件”来设计的web架构,这样就可以解决瞬时涌入的大量高并发访问造成的网站压力。看到上面阿堂的举例,大家应该对“消息队列中间件”在大型网站中的重要性有了一个比较感官的认识了。

测试MetaQ

  1.   下载ZooKeeper,修改相关配置文件

    (新的版本的已经提供了Windows下的启动)单机安装非常简单,只要获取到 Zookeeper 的压缩包并解压到某个目录如:/home/zookeeper-3.2.2 下,Zookeeper 的启动脚本在 bin 目录下,Linux 下的启动脚本是 zkServer.sh,在 3.2.2 这个版本 Zookeeper 没有提供 windows 下的启动脚本,所以要想在 windows 下启动 Zookeeper 要自己手工写一个,如清单 1 所示:

 setlocal 
 set ZOOCFGDIR=%~dp0%..\conf 
 set ZOO_LOG_DIR=%~dp0%.. 
 set ZOO_LOG4J_PROP=INFO,CONSOLE 
 set CLASSPATH=%ZOOCFGDIR% 

 set CLASSPATH=%~dp0..\*;%~dp0..\lib\*;%CLASSPATH% 
 set CLASSPATH=%~dp0..\build\classes;%~dp0..\build\lib\*;%CLASSPATH% 
 set ZOOCFG=%ZOOCFGDIR%\zoo.cfg 
 set ZOOMAIN=org.apache.zookeeper.server.ZooKeeperServerMain 
 java "-Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.root.logger=%ZOO_LOG4J_PROP%" 
 -cp "%CLASSPATH%" %ZOOMAIN% "%ZOOCFG%" %* 
 endlocal

    在你执行启动脚本之前,还有几个基本的配置项需要配置一下,Zookeeper 的配置文件在 conf 目录下,这个目录下有 zoo_sample.cfg 和 log4j.properties,你需要做的就是将 zoo_sample.cfg 改名为 zoo.cfg,因为 Zookeeper 在启动时会找这个文件作为默认配置文件。下面详细介绍一下,这个配置文件中各个配置项的意义。

 tickTime=2000 
 dataDir=D:/devtools/zookeeper-3.2.2/build 
 clientPort=2181
  • tickTime:这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
  • dataDir:顾名思义就是 Zookeeper 保存数据的目录,默认情况下,Zookeeper 将写数据的日志文件也保存在这个目录里。
  • clientPort:这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。

当这些配置项配置好后,你现在就可以启动 Zookeeper 了,启动后要检查 Zookeeper 是否已经在服务,可以通过 netstat – ano 命令查看是否有你配置的 clientPort 端口号在监听服务。

        点击bin/zkServer.cmd

更多安装信息:http://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper/

  1. 安装启动metaq服务器

         点击bin/metaServer.bat

  1. 编写测试代码

      Producer.java

public class Producer {
    public static void main(String[] args) throws Exception {
        final MetaClientConfig metaClientConfig = new MetaClientConfig();
        final ZKConfig zkConfig = new ZKConfig();
        //设置zookeeper地址
        zkConfig.zkConnect = "127.0.0.1:2181";
        metaClientConfig.setZkConfig(zkConfig);
        // New session factory,强烈建议使用单例
        MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig);
        // create producer,强烈建议使用单例
        MessageProducer producer = sessionFactory.createProducer();
        // publish topic
        final String topic = "test";
        producer.publish(topic);

        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        String line = null;
        while ((line = reader.readLine()) != null) {
            // send message
            SendResult sendResult = producer.sendMessage(new Message(topic, line.getBytes()));
            // check result
            if (!sendResult.isSuccess()) {
                System.err.println("Send message failed,error message:" + sendResult.getErrorMessage());
            }
            else {
                System.out.println("Send message successfully,sent to " + sendResult.getPartition());
            }
        }
    }

}

AsyncConsumer.java

public class AsyncConsumer {
    public static void main(String[] args) throws Exception {
        final MetaClientConfig metaClientConfig = new MetaClientConfig();
        final ZKConfig zkConfig = new ZKConfig();
        //设置zookeeper地址
        zkConfig.zkConnect = "127.0.0.1:2181";
        metaClientConfig.setZkConfig(zkConfig);
        // New session factory,强烈建议使用单例
        MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig);
        // subscribed topic
        final String topic = "test";
        // consumer group
        final String group = "meta-example";
        // create consumer,强烈建议使用单例
        MessageConsumer consumer = sessionFactory.createConsumer(new ConsumerConfig(group));
        // subscribe topic
        consumer.subscribe(topic, 1024 * 1024, new MessageListener() {

            public void recieveMessages(Message message) {
                System.out.println("Receive message " + new String(message.getData()));
            }


            public Executor getExecutor() {
                // Thread pool to process messages,maybe null.
                return null;
            }
        });
        // complete subscribe
        consumer.completeSubscribe();
    }
}