понедельник, 12 декабря 2011 г.

Работаем с очередью RabbitMQ в java


Ничто так не помогает справиться с нагрузкой на web-приложение, как асинхронная обработка. Можете это цитировать :)
Одним из лучших решений для организации очередей сообщений для асинхронной обработки является на сегодня RabbitMQ. Тут я опишу как без особых проблем установить и задействовать в своём java-проекте этот замечательный инструмент.

Итак, устанавливаем:
  1. sudo aptitude install erlang
  2. sudo aptitude install rabbitmq-server
Готово.

Для работы с сервером очередей нам понадобится официальный java-клиент RabbitMQ. Берём его отсюда.
Из архива понадобится "rabbitmq-client.jar" и "commons-io-1.2.jar".

Теперь перед нами стоит классическая задача: реализовать передачу сообщения от одного java-приложения другому. При этом выключение получателя не будет приводить к потере сообщений: пока их не обработают RabbitMQ позаботится о их сохранности. Также мы реализуем многопоточную обработку сообщений получателем. Уверен, что это потребуется, ведь в реальных задачах время обработки сообщения существенно превышает время отправки.
Итак, приступим:

Первое приложение, которое отправляет сообщения: (код в основном взят отсюда)

public class MessageProduser {
 
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        Connection conn = factory.newConnection();
        Channel channel = conn.createChannel();
        String exchangeName = "myExchange";
        String routingKey = "testRoute";
 
        for (int i=1; i<21; i++) {
            byte[] messageBodyBytes = ("test "+i).getBytes();
            channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
        }
        channel.close();
        conn.close();
    }
}


Тут мы открываем очередь и посылаем в неё подряд 20 сообщений. Работать это не будет, если хотя бы раз не запустить получателя сообщений. В нашем случае именно получатель создаёт очередь.
Код получателя:

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
 
public class MessageReceiver {
 
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        Connection conn = factory.newConnection();
        Channel channel = conn.createChannel();
        String exchangeName = "myExchange";
        String queueName = "myQueue";
        String routingKey = "testRoute";
        boolean durable = true;
        channel.exchangeDeclare(exchangeName, "direct", durable);
        channel.queueDeclare(queueName, durable, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, false, consumer);
        boolean run = true;
        while (run) {
            QueueingConsumer.Delivery delivery;
            try {
                delivery = consumer.nextDelivery();
                new MessageThread(channel, new String(delivery.getBody()), delivery.getEnvelope().getDeliveryTag()).start();
            } catch (InterruptedException ie) {
                continue;
            }
        }
        channel.close();
        conn.close();
    }
}

Тут мы ожидаем сообщение, при его получении отдаём его на обработку отдельному потоку и тут же готовы принять следующее. Код потока-обработчика предельно прост: мы просто имитируем обработку а затем удаляем обработанное сообщение из очереди.

public class MessageThread extends Thread {
 
    private Channel channel;
    private String message;
    private long tag;
 
    public MessageThread(Channel channel, String message, long tag) {
        this.channel = channel;
        this.message = message;
        this.tag = tag;
    }
 
    @Override
    public void run() {
        try {
            System.err.println("Message received " + message);
            sleep(5000); // имитируем обработку сообщения
            channel.basicAck(tag, false);
            System.err.println("Message deleted " + message);
        } catch (Exception ex) {
            ex.printStackTrace(System.err);
        }
    }   
}

Вся схема работает достаточно просто и удивительно надёжно. Вероятно вам после знакомства с этой технологией захочется усложнить схему взаимодействия: например передавать запросы в общую очередь из нескольких приложений. Это реализуется очень просто. В строке
channel.exchangeDeclare(exchangeName, "direct", durable) 
кода получателя вторым параметром указываем ключ "topic" а значением exchangeName указываем выражение вида "exchange.*". Тогда получатель увидит сообщения от отправителей с exchangeName "exchange.1",  "exchange.other"  и т.д. а также сможет различить их получив их exchangeName из очереди как свойство delivery.getEnvelope().getExchange();

1 комментарий:

  1. Этот пример кода очень помог.
    Спасибо за публикацию.

    ОтветитьУдалить