В этой статье я расскажу, как создать новое приложение Oracle CEP, которое читает данные с помощью адаптера из очереди JMS, обрабатывает их и передает на выход только те сообщения, которые удовлетворяют условию. В итоге мы придем вот к такому приложению
Начинаем работу
- Инсталлируем плагин для Oracle CEP и запускаем сервер Oracle CEP. Это детально описано - в предыдущей статье.
- Создаем новое приложение для Oracle CEP.
- Задаем параметры приложения Oracle CEP. При этом выбираем не использовать базовые приложения
- Приложение создано и мы можем приступать к настройке работы приложения.
Настройка готового и создание собственного адаптера
Основная роль адаптера - преобразование данных, поступающих из различных источников в формат событий Oracle CEP. Затем эти события передаются следующим по цепочке компонентам сети обработки событий. Обычно, адаптер - это точка входа или выхода событий в/из приложения Oracle CEP.
В поставке Oracle CEP идут следующие готовые адаптеры:
- JMS адаптера (Weblogic JMS, Oracle AQ, Websphere MQ и др.)
- HTTP publish-subscribe адаптер
Если у вас другой источник, например:
- датчики со своими API
- плоские файлы
- RSS, twitter
- Reuters, Wombat или Bloomberg
то пишется Java-класс, обрабатывающий событие. Мы рассмотрим 2 варианта получения событий: с помощью JMS и с помощью адаптера собственной разработки.
JMS
- Настраиваем очередь сообщений в соответствие с инструкцией Oracle WebLogic Server 10.3: Configure JMS Servers and Destinations (инструкция для Weblogic; для других серверов приложений - обращайтесь к документации соответствующих продуктов)
- Для конфигурации входящего JMS-адаптера, кликнем правой кнопкой в EPN Editor и выберем New->Adapter (для перехода в EPN Editor нужно кликнуть правой кнопкой файл HelloWorld.context.xml и выбрать Open in EPN Editor)
- Задаем имя адаптеру и указываем его тип - jms-inbound
- Указываем пока еще не существующий тип событий - jmsEvent. Его нужно будет создать позже, а также параметры для подключения к JMS
- Закрываем окно конфигурации JMS-адаптера. В результате должна получиться следующая картинка:
- Создаем тип ожидаемого события jmsEvent в EPN Editor на закладку Event Types - я добавил 2 поля username и message:
Создаем тип события jmsEvent
- Как было оговорено - на вход сети будут поступать события, содержащие 2 атрибуты username и message. Для эффективной работы с этими событиями создадим класс Java - cep.event.JMSEvent. В этом классе создадим 2 private поля - username и message:
- Для эти полей реализуем соответствующие методы get и set. Для этого кликнем правой кнопкой в редакторе кода и выберем Source->Generate Getter And Setter methods.
- Теперь необходимо зарегистрировать тип события в EPN Editor на закладке Event Types.
Создаем потребителя событий (Sink)
Обычно событий, поступающие из адаптера перерабатываются, просеиваются, анализируются и агрегируются. В нашем простейшем случае - мы сразу направим поток событий потребителю для вывода на экран. Выводом на экран будет заниматься специальный класс cep.sink.simpleListener.
- Создаем новый класс - cep.sink.simpleListener. Для того, чтобы включить его в качестве потребителя в сеть, необходимо в этом классе реализовать соответствующий интерфейс - StreamSink.
- Вывод на экран организовываем с помощью System.out:
2 |
public void onInsertEvent(Object event) throws EventRejectedException { |
3 |
if (event instanceof JMSEvent){ |
4 |
JMSEvent jmsEvent=(JMSEvent) event; |
5 |
System.out.println( "---------------------------------------------" ); |
6 |
System.out.format( "Username:%s Message:%s\n" ,jmsEvent.getUsername(),jmsEvent.getMessage()); |
- Ну и последнее - нужно связать выход адаптера со входом listener. Для этого в EPN Editor жмем правой кнопкой на полотне и выбираем New->Event Bean. Затем кликаем на иконке адаптера и, удерживая левую кнопку мыши, тащит стрелку на eventBean. После соединения адатера и listener стрелкой появляется ошибка (около адаптера загорается красный фонарик) - он пропадет после того, как мы свяжем ранее описанный класс с элементом сети eventBean.
- Жмем 2 раза на eventBean - в открывшемся xml-описании переименовываем объект и задаем для него класс event.sink.simpleListener:
- В результате мы имеем вот такую сеть:
Разворачиваем приложение и тестируем.
Для того, чтобы развернуть приложение на сервере:
- Запускаем сервер CEP
- Публикуем приложение. Для этого жмем правую кнопку на сервере CEP и выбираем "Add and Remove", добавляем приложение. Затем необходимо синхронизировать приложение кнопкой "Publish to Server"
- Приложение для обработки событий развернуто.
- Скачаем тестовое приложение, которое будет наполнять очередь сообщений. Скачать приложение можно по следующим ссылка:
Результат работы
В консоли Eclipse мы увидим сообщения, получаемые из очереди:
Добавляем процессор
Мы только что создали простейшее приложение:
Давайте добавим обработчик событий. Для того мы создаем 2 канала и процессор, а также коммутируем их:
Кликнем два раза по каждому каналу и укажем какой тип события следует ожидать по каждому из них:
1 |
< wlevs:channel id = "inputChannel" event-type = "jmsEvent" > |
2 |
< wlevs:listener ref = "processor" /> |
5 |
< wlevs:channel id = "outputChannel" event-type = "jmsEvent" > |
6 |
< wlevs:listener ref = "simpleOutputBean" /> |
По-умолчанию процессор настраивается так, чтобы пропускать через себя все сообщения. Настроим его таким образом, чтобы он пропускал только те сообщения в которых встречается "100". Для этого кликнем 2 раза на процессоре в EPN Editor - откроется окно настройки процессора:
Как можно видеть процессору сопоставлен 1 запрос на CQL и тот закомментирован
4 |
< query id = "ExampleQuery" > <!-- <![CDATA[ select * from MyChannel [now] ]]> --> </ query > |
Давайте раскомментируем его и зададим условия на события (используется regular expression), которые процессор будет пропускать дальше по цепочке:
4 |
< query id = "ExampleQuery" > |
5 |
<![CDATA[ select * from inputChannel [now] where message like ".*100.*" ]]> |
В результате на консоль будут выводиться только сообщения, содержащие 100:
Заключение
Приложение, которое мы только что создали обеспечивает непрерывный анализ потока поступающих событий и вычленение из него событий, интересных нам. Обработка относительно простая, но ее очень просто можно расширить до более сложной.