Ничто так не помогает справиться с нагрузкой на web-приложение, как асинхронная обработка. Можете это цитировать :)
Одним из лучших решений для организации очередей сообщений для асинхронной обработки является на сегодня RabbitMQ. Тут я опишу как без особых проблем установить и задействовать в своём java-проекте этот замечательный инструмент.
Итак, устанавливаем:
- sudo aptitude install erlang
- sudo aptitude install rabbitmq-server
Для работы с сервером очередей нам понадобится официальный java-клиент RabbitMQ. Берём его отсюда.
Из архива понадобится "rabbitmq-client.jar" и "commons-io-1.2.jar".
Теперь перед нами стоит классическая задача: реализовать передачу сообщения от одного java-приложения другому. При этом выключение получателя не будет приводить к потере сообщений: пока их не обработают RabbitMQ позаботится о их сохранности. Также мы реализуем многопоточную обработку сообщений получателем. Уверен, что это потребуется, ведь в реальных задачах время обработки сообщения существенно превышает время отправки.
Итак, приступим:
Первое приложение, которое отправляет сообщения: (код в основном взят отсюда)
public class MessageProduser { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); factory.setHost("127.0.0.1"); factory.setPort(5672); Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); String exchangeName = "myExchange"; String routingKey = "testRoute"; for (int i=1; i<21; i++) { byte[] messageBodyBytes = ("test "+i).getBytes(); channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes); } channel.close(); conn.close(); } }
Тут мы открываем очередь и посылаем в неё подряд 20 сообщений. Работать это не будет, если хотя бы раз не запустить получателя сообщений. В нашем случае именно получатель создаёт очередь.
Код получателя:
import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class MessageReceiver { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); factory.setHost("127.0.0.1"); factory.setPort(5672); Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); String exchangeName = "myExchange"; String queueName = "myQueue"; String routingKey = "testRoute"; boolean durable = true; channel.exchangeDeclare(exchangeName, "direct", durable); channel.queueDeclare(queueName, durable, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, false, consumer); boolean run = true; while (run) { QueueingConsumer.Delivery delivery; try { delivery = consumer.nextDelivery(); new MessageThread(channel, new String(delivery.getBody()), delivery.getEnvelope().getDeliveryTag()).start(); } catch (InterruptedException ie) { continue; } } channel.close(); conn.close(); } }
Тут мы ожидаем сообщение, при его получении отдаём его на обработку отдельному потоку и тут же готовы принять следующее. Код потока-обработчика предельно прост: мы просто имитируем обработку а затем удаляем обработанное сообщение из очереди.
public class MessageThread extends Thread { private Channel channel; private String message; private long tag; public MessageThread(Channel channel, String message, long tag) { this.channel = channel; this.message = message; this.tag = tag; } @Override public void run() { try { System.err.println("Message received " + message); sleep(5000); // имитируем обработку сообщения channel.basicAck(tag, false); System.err.println("Message deleted " + message); } catch (Exception ex) { ex.printStackTrace(System.err); } } }
Вся схема работает достаточно просто и удивительно надёжно. Вероятно вам после знакомства с этой технологией захочется усложнить схему взаимодействия: например передавать запросы в общую очередь из нескольких приложений. Это реализуется очень просто. В строке
channel.exchangeDeclare(exchangeName, "direct", durable)кода получателя вторым параметром указываем ключ "topic" а значением exchangeName указываем выражение вида "exchange.*". Тогда получатель увидит сообщения от отправителей с exchangeName "exchange.1", "exchange.other" и т.д. а также сможет различить их получив их exchangeName из очереди как свойство delivery.getEnvelope().getExchange();
Этот пример кода очень помог.
ОтветитьУдалитьСпасибо за публикацию.