Вызов кода на языке Python из продукта IBM InfoSphere Streams

Источник: IBM

Корпоративные разработчики широко применяют язык программирования Python для быстрого построения работающих решений. Многие компании применяют Python при создании ИТ-активов для регулярного использования. IBM InfoSphere Streams - это программный продукт связующего уровня, допускающий реализацию логики непосредственно на языках C++ и Java. В предлагаемой статье описывается, как непосредственно из приложений IBM InfoSphere Streams вызывать программный код, написанный на языке Python.

Обзор

Продукт IBM InfoSphere Streams представляет собой программное решение связующего уровня для высокопроизводительной обработки событий в реальном времени. Уникальная мощь этого решения обуславливается его способностью получать структурированные и неструктурированные данные из множества источников данных с целью выполнения анализа в реальном времени. Эта способность обеспечивается сочетанием простого в использовании языка разработки приложений под названием SPL (Streams Processing Language) и распределенной платформы исполнения. Кроме того, это решение связующего уровня предоставляет гибкую инфраструктуру разработки приложений, которая интегрировать в приложения продукта Streams (далее - Streams-приложения) программный код, написанный на языках C++ и Java. Многие разработчики, занимающиеся созданием ИТ-активов для "реального мира", в дополнение к языкам C++ и Java применяют т.н. динамические языки программирования. Язык Python обладает отлично интегрируется с различными системами, благодаря чему многие компании активно применяют его для быстрого построения решений. Что касается уже существующих активов, написанных на языке Python, то на этот случай имеется способ интеграции Python-кода внутрь Streams-приложений. В данной статье такая интеграция подробно объясняется на простом примере Streams-приложения.

Предполагается, что читатель знаком с продуктом InfoSphere Streams и с применяемой в нем моделью программирования на основе SPL. Кроме того, для понимания приемов программирования необходимо практическое знание языков C++ и Python.

InfoSphere Streams - это ключевой компонент в стратегии IBM по созданию платформы для работы с большими данными. Многие нынешние и потенциальные заказчики IBM, обладающие активами на языке Python (и соответствующими навыками), смогут получить дополнительный эффект посредством сочетания этих активов с продуктом InfoSphere Streams. Эта статья ориентирована на читателей, занимающихся приложениями для работы с большими данными, включая проектировщиков, разработчиков и архитекторов таких приложений.

Учебный сценарий

При объяснении практически значимых технических подробностей, связанных с вызовом программного кода на языке Python (далее - Python-кода) из Streams-приложения в статье применяется следующий простой пример. Сценарий этого примера включает чтение имен из нескольких веб-адресов, содержащихся во входном CSV-файле, и вызов простой пользовательской Python-функции, которая в качестве результата своей работы возвращает перечисленные ниже данные. Затем этот результат для каждого веб-адреса записывается в отдельный выходной CSV-файл.

  • Основное имя хоста в URL-адресе
  • Список альтернативных имен хоста для URL-адреса
  • Список IP-адресов для URL-адреса
  • Название компании, указанное в строке URL-адреса

Предварительные условия

Фрагменты кода, приведенные ниже, поясняют подробности реализации вышеуказанного сценария. Кроме того, читатель может загрузить используемый в качестве примера учебный код, а затем запустить его в собственной среде IBM InfoSphere Streams. Этот учебный код был протестирован в следующей среде.

  • RedHat Enterprise Linux версии 6.1 или выше (или эквивалентная версия CentOS)
  • gcc версии 4.4.5 20110214 (Red Hat 4.4.5-6) (GCC)
  • Python 2.6.6 (r266:84292, Apr 11 2011, 15:50:32; поставляется вместе с RHEL6)
  • /usr/lib/libpython2.6.so
  • Каталог /usr/include/python2.6 с файлом Python.h и другими include-файлами
  • Продукт IBM InfoSphere Streams 3.x, сконфигурированный с работающим экземпляром Streams

Описываемые в статье приемы могут работать и в среде, имеющей небольшие отличия (например, в среде на основе версий RHEL 5.8 и Streams 2.0.0.4) - при условии соответствующих изменений программного кода и настроек среды.

Высокоуровневые компоненты приложения

В нашем простом учебном сценарии используются три основных компонента. Каждый из этих компонентов достаточно независим для того, чтобы быть представленным в виде собственного проекта - в силу естественного разделения по применяющимся в этих компонентах языкам программирования.

  • Скрипт UrlToIpAddress на языке Python (Python-скрипт)
  • Проект StreamsToPythonLib на языке C++ (C-проект)
  • Проект streams-to-python на языке SPL (SPL-проект)

UrlToIpAddress- это Python-скрипт с простой логикой, которая использует API-интерфейсы Python для получения информации об IP-адресе и об имени хоста для данного веб-адреса. Этот скрипт может быть протестирован независимо c помощью интерпретатора Python. Этот крошечный скрипт играет важную роль в этой статье - с его помощью демонстрируется, как из Streams-приложения осуществляется вызов функций, находящихся в Python-скрипте.

StreamsToPythonLib- это проект на языке C++. В него включен исходный код для логики нативной SPL-функции. Этот исходный код использует преимущественно API-интерфейс Python/C для встраивания Python-кода в процессе исполнения кода на C++. Встраивание Python-кода в код на C++ подробно описано в документации по языку Python. Этот проект содержит include-файл обертки (.h), который является весьма важным, поскольку предоставляет Streams-приложению на языке SPL точку входа, позволяющую вызвать любой метод класса, написанного на C++. В этом проекте вся написанная на C++ логика будет компилироваться в файл типа shared object library (.so) и будет доступна SPL-приложению.

streams-to-python- это проект продукта Streams, написанный на языке SPL. Внутри него имеется простой граф SPL-потока для выполнения цепочки вызовов (SPL<-->C++<-->Python). Этот SPL-код читает URL-адреса из входного файла в каталоге данных, вызывает нативную функцию C++ для исполнения Python-кода, получает результаты и записывает их в выходной файл в каталоге данных. В каталоге SPL-проекта имеется XML-файл для модели нативной функции, содержащий метаинформацию, которая необходима для того, чтобы напрямую вызвать метод класса C++ из SPL-кода. Эта информация содержит следующие сведения: имя include-файла обертки (wrapper) для C++; пространство имен C++, содержащее функции-обертки; прототип функции-обертки C++, выраженный с помощью синтаксиса/типов SPL; имя совместно используемой (общей) библиотеки (shared object library), созданной из проекта C++; местоположение этой библиотеки; местоположения include-файла обертки и т. д.

В следующих разделах мы детальнее рассмотрим каждый из этих трех компонентов приложения, а также более подробно рассмотрим код на Python, C++ и SPL.

Логика на языке Python

В листинге 1 показан код на языке Python. Это именно та бизнес-логика, которую мы хотим вызвать из Streams.

Листинг 1. Листинг 1. UrlToIpAddress.py
import re, sys, socket

def getCompanyNameFromUrl(url):
    # Do a regex match to get just the company/business part in the URL.
    # Example: In "www.ibm.com", it will return "ibm".
    escapedUrl = re.escape(url)
    m = re.match(r'www\.(.*)\..{3}', url)
    x = m.group(1)
    return (x)

def getIpAddressFromUrl(url):
    # The following python API will return a triple
    # (hostname, aliaslist, ipaddrlist)
    # hostname is the primary host name for the given URL
    # aliaslist is a (possibly empty) list of alternative host names for the same URL
    # ipaddrlist is a list of IPv4 addresses for the same interface on the same host
    #
    # aliaslist and ipaddrlist may have multiple values separated by
    # comma. We will remove such comma characters in those two lists.
    # Then, return back to the caller with the three comma separated
    # fields inside a string. This can be done using the Python 
    # list comprehension.
    return(",".join([str(i).replace(",", "") for i in socket.gethostbyname_ex(url)])) 

if ((__name__ == "__main__") and (len(sys.argv) >= 2)):
    url = sys.argv[1]
    # print("url=%s" % (url, ))
    print "IP address of %s=%s" % (url, getIpAddressFromUrl(url))
    print "Company name in the URL=%s" % repr(getCompanyNameFromUrl(url))
elif ((__name__ == "__main__") and (len(sys.argv) < 2)):
    sys.exit("Usage: python UrlToIpAddress.py www.watson.ibm.com")

Очевидно, что Python-код в листинге 1 был сознательно упрощен ради ясности. Этот код содержит две Python-функции, за которыми следует фрагмент, запускающийся при исполнении Python-скрипта с помощью интерпретатора Python. Чтобы убедиться в том, что данный код работает ожидаемым образом, можно запустить этот скрипт из окна оболочки: python UrlToIpAddress.py www.watson.ibm.com.

В верхней части файла осуществляется импорт Python-модулей, в том числе для поддержки регулярных выражений и сокетов. Первая функция -getCompanyNameFromUrl- принимает на вход веб-адрес. Эта функция применяет к веб-адресу регулярное выражение с целью выделения названия компании и возвращает это название вызывающей стороне. Входным параметром следующей функции -getIpAddressFromURL- также является веб-адрес. Она вызывает интерфейс Python socket API с целью получения IP-адреса для заданного веб-адреса. В данном случае этот API-интерфейс Python (gethostbyname) возвращает кортеж (tuple) - запись, состоящую из трех элементов. Эти три элемента представляют имя хоста сервера для данного веб-адреса, альтернативные имена хоста (если таковые имеются) и один или несколько IP-адресов для этого сервера. Функция не возвращает вызывающей стороне результат типа tuple, а вместо этого формирует из трех элементов кортежа Python-строку, соединяя элементы через запятую. Затем она возвращает вызывающей стороне результат в виде строки.

Цель данного примера состоит в демонстрации вызова этих двух функций Python-скрипта изнутри Streams-приложения. Мы сосредоточимся на этом в следующих разделах.

Логика на языке C++

Продукт InfoSphere Streams поддерживает два способа включения кода, написанного на языке C++. Первый способ состоит в создании базисных Streams-операторов на C++ с целью включения бизнес-логики, написанной на C++. Другой способ состоит в исполнении любых произвольных методов класса C++ непосредственно из SPL-кода как нативных функций. В этом упражнении мы будем использовать второй способ (т. е. подход на основе нативных функций). С этой целью мы создадим отдельный проект C++ под названием StreamsToPythonLib, в котором напишем необходимый код для вызова Python-функций, рассмотренных в предыдущем разделе. Затем мы создадим библиотеку shared object library (.so), чтобы сделать этот код на C++ доступным для Streams-приложения на языке SPL.

В таблице 1 показано содержимое каталога проекта C++ под названием StreamToPythonLib.

Таблица 1. Каталог проекта C++ под названием StreamsToPythonLib
Файл Описание
StreamsToPython.h Файл класса интерфейса C++
StreamsToPython.cpp Файл класса реализации C++
StreamsToPythonWrappers.h Include-файл C++, содержащий код нативной Streams-функции
mk Скрипт, осуществляющий построение библиотеки (.so) для данного проекта C++
Листинг 2. StreamsToPython.h
#ifndef STREAMS_TO_PYTHON_H_
#define STREAMS_TO_PYTHON_H_

using namespace std;

// To avoid a redefinition compiler error, undefine the following.
#undef _POSIX_C_SOURCE
#undef _XOPEN_SOURCE
// This should be the first include file (according to Python documentation)
#include "Python.h"

// Include files that defines SPL types and functions.
#include "SPL/Runtime/Function/SPLFunctions.h"
#include <SPL/Runtime/Utility/Mutex.h>
#include <SPL/Runtime/Type/ValueHandle.h>
// Include standard C++ include files.
#include <sys/time.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <sstream>

// This will allow us to access the types and functions from SPL namespace.
using namespace SPL;

// Your #define constant definitions go here.

// Class definition follows here.
namespace calling_python_from_streams {
   class GlobalStreamsToPythonSession {
      private:
         // This member variable tells us if a global
         // streams to Python caller handle already
         // exists for a given PE/process.
         boolean streamsToPythonHandleExists;

         // Following member variables are required for
         // calling Python C APIs.
         static boolean pyInitialized;
         static boolean importFailed;
         PyObject* pFunc1;
         PyObject* pFunc2;

      public:
         GlobalStreamsToPythonSession();
         virtual ~GlobalStreamsToPythonSession();

         // This method establishes StreamsToPython handle for a given PE/process.
         int32 initializeStreamsToPython();
         // This method gets the IP address of a given URL.
         boolean getIpAddressFromUrl(rstring const & url,
            rstring & primaryHostName, rstring & alternateHostNames,
            rstring & ipAddressList, rstring & companyName);

         // Get the global (Singleton) Streams to Python session object.
         static GlobalStreamsToPythonSession & getGlobalStreamsToPythonSession();
   }; 
} 

#endif /* STREAMS_TO_PYTHON_H_ */

В листинге 2 показан класс интерфейса C++. Он начинается с включения файла Python.h, что необходимо нам для осуществления вызова нативного Python-кода. Листинг содержит файлы стандартных библиотечных заголовков, а также include-файлы SPL. Следует отметить, что включение файлов SPL-заголовков и использование пространства SPL-имен позволяет нам получить доступ к SPL-типам данных изнутри C++. Многие примитивные типы данных и типы-коллекции в SPL являются представлениями эквивалентных типов данных, встроенных в C++. В разделах namespace и class декларируются переменные экземпляра и методы экземпляра. Некоторые переменные экземпляра, связанные с Python-объектом, будут рассмотрены позднее. Присутствуют декларированные прототипы для конструктора класса, для деструктора класса и для метода бизнес-логики, вызов которого будет осуществляться из SPL. В конце листинга 2 имеется статический метод getGlobalStreamsToPythonSession, который предоставляет сигнлтонный доступ к этому классу C++ из SPL-кода. В следующих разделах эти вопросы будут рассмотрены более подробно.

Листинг 3. StreamsToPython.cpp
#include "StreamsToPython.h"
#include <dlfcn.h>

namespace calling_python_from_streams {
   // Initialize the static member variables in this class.
   boolean GlobalStreamsToPythonSession::pyInitialized = false;
   boolean GlobalStreamsToPythonSession::importFailed = false;
...

Ссылка на Полный текст листинга 3 (StreamsToPython.cpp).

В листинге 3 показан класс реализации. Он начинается include-операторами для соответствующего класса интерфейса и для загрузчика динамической библиотеки. Язык Python допускает расширение и встраивание. В код на языке Python можно встроить расширение для вызова функции на языке C. Точно так же внутрь кода на C++ можно встроить код на языке Python. Основной момент в листинге 3 состоит в использовании API-интерфейса Python/C для вызова нативного Python-кода. Наш класс реализации имеет пять методов C++. Рассмотрим каждый из этих методов более подробно.

Метод-конструктор (Constructor):- При инициализации рассматриваемого класса выполняются три следующих основных действия.

  1. Задание пути Python к текущему каталогу.
  2. Инициализация интерпретатора Python, которая должна быть выполнена до использования каких-либо API-функций Python/C.
  3. Динамическая загрузка общей библиотеки libPython в пространство нашего процесса. Даже если бы эта Python-библиотека была бы загружена автоматически динамическим загрузчиком, мы все равно должны загрузить ее с помощью dlopen - благодаря этому наш Python-скрипт сможет связываться надлежащим образом с другими Python-модулями, реализованными как общие библиотеки объектов.

Метод-деструктор (Destructor): При выходе объекта описываемого класса из области действия выполняются следующие операции.

  1. Сброс переменной экземпляра, которая содержит дескриптор, необходимый для singleton-доступа к классу.
  2. Очистка дескрипторов, полученных для обеих наших Python-функций.

getGlobalStreamsToPythonSession: Как показано в листинге 2, этот метод декларирован как статический метод. Это точка входа в данный класс в случае вызова нативной Streams-функции. Мы хотим иметь лишь по одному экземпляру этого класса C++ в каждом PE-элементе Streams (Processing Element -обрабатывающий элемент), поэтому нам необходимо сопровождать сигнлтонный объект для этого класса C++. Соответственно при вызове данного конкретного метода создается статический объект этого класса, который затем возвращается вызывающей стороне. Именно так Streams-приложение способно получить статический дескриптор для объекта C++, а затем произвольным образом вызвать любой метод класса C++ с помощью этого статического дескриптора.

initializeStreamsToPython: Мы поддерживаем по одному синглтонному объекту этого класса C++ для каждого процесса, поэтому для этого класса можно поддерживать переменные состояния, которые могут совместно использоваться несколькими вызовами методов. Это важный аспект проектирования, который следует учитывать, даже если данное конкретное приложение не сохраняет состояния. Streams-приложение, в котором применяются нативные функции C++, может использовать такой метод для инициализации переменных состояния. Хорошее применение этого подхода - открытие соединения с базой данных и сохранение дескриптора этого соединения для последующего доступа к этой базе данных. Описанное в данной статье приложение попросту гарантирует, что только самый первый вызов этого метода C++ инициализирует глобальный дескриптор, который служит индикатором создания синглтонного объекта этого класса.

getIpAddressFromUrl: Это гораздо более длинный метод в этом классе C++; он содержит бизнес-логику, необходимую для вызова Python-функции и для извлечения возвращаемых значений. Инфраструктура Python предоставляет богатый набор API-интерфейсов для встраивания Python-кода в приложения, написанные на C или на C++. После инициализации интерпретатора Python в методе-конструкторе посредством Py_Initialize мы сможем использовать в этом методе другие функции API-интерфейса Python/C (далее - API-функции). При вызове этого метода в него в качестве аргумента передается веб-адрес (например, www.ibm.com; обратите внимание на то, что такая часть URL-адреса, как http://, должна быть опущена). Кроме того, этот метод принимает в качестве аргументов четыре другие строковые ссылки, посредством которых результат будет возвращен вызывающей стороне. Поскольку в этом классе C++ мы используем пространство имен SPL, нам разрешено обращаться к таким SPL-типам данных, как rstring, uint32, list и т.д. Многие SPL-типы данных получены из типов данных C++, таких как std::string, int, vector и т.д.

Самая первая задача в этом методе C++ состоит в получении действующих указателей на две нативные Python-функции, которые мы собираемся вызвать. При первом вызове этого метода мы хотим получить указатели на две Python-функции и сохранить эти указатели в переменных экземпляра pFunc1 и pFunc2. Это позволит нам повторно использовать их при последующих вызовах. Чтобы получить указатели на вышеупомянутые Python-функции, мы сначала должны импортировать Python-модуль, содержащий эти две функции. В данном случае Python-модуль - это не более чем имя файла Python-скрипта без расширения .py. Мы должны воспользоваться функцией PyString_FromString для получения строкового Python-объекта из строкового объекта C++, содержащего имя Python-модуля. Затем мы осуществляем вызов функции PyImport_Import с целью получения дескриптора для нашего Python-модуля. В случае получения ошибки от любой из API-функций Python/C мы задаем переменную экземпляра importFailed и осуществляем возврат из этого метода. Последующие вызовы этого метода C++ будут производиться только в том случае, если этому предшествовал успешный импорт Python-модуля. Вышеупомянутые ошибки API-функций Python/C могут быть обнаружены и зарегистрированы в журнале с помощью API-функций PyErr_Occurred и PyErr_Print. Кроме того, теперь пришло время представить SPLAPPTRC - API-интерфейс SPL-макросов для C++, который позволяет записывать информацию об отладке или о трассировке приложения в систему регистрации Streams. Он имеет три входных аргумента: уровень журналирования; строковый объект C++, содержащий зарегистрированное в журнале сообщение; аспект для фильтрации журнала в соответствии со спецификой приложения.

Теперь, после импорта нашего Python-модуля UrlToIpAddress, мы проверяем, что функции, которые мы собираемся вызвать, действительно существуют в этом Python-модуле. С этой целью мы передаем имя соответствующей Python-функции через API-функцию PyObject_HasAttrString. После подтверждения наличия Python-функций в Python-модуле мы можем получить указатель на соответствующую функцию с помощью API-функции PyObject_GetAttrString. После того, как мы получили действующий указатель на Python-функцию, необходимо убедиться в том, что ее действительно можно вызвать. Мы делаем это с помощью API-функции PyCallable_Check. После успешного выполнения вышеописанных шагов две наши переменные экземпляра C++ (pFunc1 и pFunc2) будут указывать на действующие и доступные для вызова Python-функции, написанные пользователем. Теперь мы можем вызвать API-фукнцию PyObject_CallFunction для исполнения нашей функции посредством передачи переменной экземпляра pFunc1 или pFunc2 вместе со списком ожидаемых аргументов этой функции. В нашем случае в качестве аргумента Python-функции мы передаем строку (веб-адрес). Соответственно вторым аргументом является символ s, указывающий, что аргумент представлен в строковом формате, а третьим аргументом является фактический веб-адрес, представленный в виде обычной C-строки. Поскольку каждая из наших функций возвращает результат в виде строки, мы с помощью API-функции PyString_AsString преобразуем возвращенный строковый объект Python в обычную C-строку. Мы сохраняем строки результатов от обеих Python-функции в наших собственных локальных переменных типа rstring. Как было указано в разделе Учебный сценарий, наша первая Python-функция возвращает результат в виде строки из трех элементов, разделенных запятыми (т. е. в формате CSV). Чтобы разделить ее на CSV-поля, мы можем вызвать стандартную функцию csvTokenize из состава SPL-инструментария и присвоить возвращенные значения непосредственно ссылкам, которые ранее были переданы вызывающей стороной в качестве аргументов метода C++. Вот что происходит при вызове Python-функций из C++.

В этом классе реализации C++ следует обратить внимание и на два других важных момента. Когда мы использовали API-функцию PyImport_Import для импорта нашего модуля UrlToIpAddress.py, как она узнала о физическом местоположении файла этого Python-скрипта? В упоминавшемся выше конструкторе C++ имеется вызов стандартного API-интерфейса POSIX, который присваивает переменной окружения PYTHONPATH значение текущего каталога посредством символа "точка". Именно благодаря этому API-функция PyImport_Import способна определить местоположение Python-скрипта и импортировать этот скрипт. В Streams-приложении текущий рабочий каталог всегда задается как подкаталог /data, доступный в каталоге SPL-проекта. Таким образом, необходимо, чтобы наш Python-скрипт было скопирован в подкаталог /data. В противном случае API-функция PyImport_Import не сможет определить местоположение нашего Python-скрипта и импортировать его. Другой важный момент в этом классе реализации C++ - широкое использование API-функции Py_DECREF. Для каждого Python-объекта ведется подсчет ссылок, который учитывает количество мест, имеющих ссылку на данный объект. Когда это количество ссылок становится равным нулю, данный объект освобождается. В языке Python манипуляции с подсчетом ссылок всегда осуществляется в явном виде. Применительно к нашему коду это означает, что всякий раз, когда мы больше не нуждаемся в валидном объекте Python, мы должны осуществить вызов API-функции Py_DECREF.

Листинг 4. StreamsToPythonWrappers.h
#ifndef STREAMS_TO_PYTHON_WRAPPERS_H_
#define STREAMS_TO_PYTHON_WRAPPERS_H_

// Include the file that contains the class definition.
#include "StreamsToPython.h"

namespace calling_python_from_streams {
   // Establish a handle to the StreamsToPython to be
   // accessed within a PE.
   inline int32 initializeStreamsToPython(void) {
      return GlobalStreamsToPythonSession::
         getGlobalStreamsToPythonSession().initializeStreamsToPython();
   }

   // Get the IP address of a given URL.
   inline boolean getIpAddressFromUrl(rstring const & url,
      rstring & primaryHostName, rstring & alternateHostNames,
      rstring & ipAddressList, rstring & companyName) {
      return GlobalStreamsToPythonSession::
         getGlobalStreamsToPythonSession().
         getIpAddressFromUrl(url, primaryHostName,
         alternateHostNames, ipAddressList, companyName);
   }
}

#endif /* STREAMS_TO_PYTHON_WRAPPERS_H_ */

В листинге 4 показан специфичный для Streams файл расширения в проекте C++ StreamsToPtyhonLib. Как указывалось выше, для того, чтобы Streams-приложение было в состоянии вызвать любой метод в классе C++, мы должны выполнить определенную дополнительную работу. Именно эта дополнительная работа выполняется в данном include-файле обертки, который содержит подставляемые функции. В начале этого файла осуществляется включение файла с интерфейсом класса C++, который был показан в листинге 2. Эти функции-обертки определены в рамках тех же границ пространства имен, как и наш реальный класс C++ в проекте StreamsToPythonLib. Streams-приложение способно вызвать любую из подставляемых функций, указанных в этом include-файле обертки. Каждая подставляемая функция получает синглтонный объект намеченного класса C++ посредством вызова статического метода getGlobalStreamsToPythonSession. Первый вызов этого статического метода осуществляет статическую инициализацию класса C++. Эта ссылка на статический объект возвращается при каждом вызове данного статического метода. Посредством получения ссылки на синглтонный объект заданная подставляемая функция-обертка теперь сможет вызвать любой метод C++, доступный в этом объекте, и передать любые возвращаемые значения обратно в Streams-приложение на языке SPL. Эта технология будет очень полезна в ваших практических Streams-проектах.

Логика на языке SPL

Теперь, после изучения используемых в этом примере компонентов Python и C++, пора переходить к базовому Streams-приложению, которое свяжет все эти компоненты в единое целое. Мы напишем короткое и понятное SPL-приложение, содержащее потоковый граф с тремя Streams-операторами, которые доступны в стандартном SPL-инструментарии.

В таблице 2 показано содержимое каталога SPL-проекта streams-to-python.

Таблица 2. Каталог SPL-проекта streams-to-python
Файл/каталог Описание
README.txt Файл с кратким описанием всего приложения
python.wrapper.example Каталог SPL-проекта
python.wrapper.example/streams_to_python.spl Простой SPL-файл, вызывающий нативную функцию C++, которая поочередно вызывает Python-функции
python.wrapper.example/native.function SКаталог нативной SPL-функции
python.wrapper.example/native.function/function.xml XML-файл, содержащий модель нативной SPL-функции
data Каталог данных SPL-приложения
data/UrlInput.csv Входной файл, содержащий тестовые веб-адреса
data/UrlToIpAddress.py Простой Python-скрипт, функции которого будут вызваны из Streams. Скрипт помещен в это место, поскольку оно является текущим рабочим каталогом для Streams-приложения
data/Expected-UrlToIpAddress-Result-Feb2013.csv CSV-файл, содержащий ожидаемые результаты этого приложения по состоянию на февраль 2013 г.
impl/lib Каталог, в который будет скопирована .so-библиотека, созданная в проекте C++. Файл модели нативной Streams-функции сконфигурирован таким образом, чтобы загружать .so-библиотеку из этого каталога
impl/include Каталог, в который будут скопированы include-файлы из вышеописанного проекта C++. Файл модели нативной Streams-функции сконфигурирован таким образом, чтобы искать include-файлы в этом каталоге
build-standalone.sh Скрипт, создающий автономный исполняемый Streams-файл, которому не требуется среда исполнения Streams
build-distributed.sh Скрипт, создающий распределенный исполняемый Streams-файл, которому требуется среда исполнения Streams
run-standalone.sh Скрипт, запускающий исполняемый файл этого приложения для автономного режима
run-distributed.sh Скрипт, запускающий исполняемый файл этого приложения для распределенного режима
stop-streams-instance.sh Скрипт, останавливающий указанный экземпляр Streams
Листинг 5. streams_to_python.spl
namespace python.wrapper.example;

composite streams_to_python {
   // Define input and output schema for this application.
   type
      InputSchema = tuple<rstring url>;
      OutputSchema = tuple<rstring url, rstring primaryHostName, 
         rstring alternateHostNames, rstring ipAddressList, rstring companyName>;
		
   graph
      // Read from an input file all the URLs for which we need to 
      // get the corresponding IP addresses.
      stream<InputSchema> UrlInput = FileSource() {
         param
            file: "UrlInput.csv";
            initDelay: 4.0;
      }

      // In the custom operator below, we will call python code to get the
      // primary host name, alternative host names, and IP addresses.
      stream<OutputSchema> IpAddressOfUrl = Custom(UrlInput) {
         logic
            onTuple UrlInput: {
               mutable rstring _primaryHostName = "";
               mutable rstring _alternateHostNames = "";
               mutable rstring _ipAddressList = "";
               mutable rstring _companyName = "";
               // Call the C++ native function that in turn will call Python functions.
               boolean result = getIpAddressFromUrl(UrlInput.url, _primaryHostName,
                  _alternateHostNames, _ipAddressList, _companyName);
						
               if (result == true) {
                  mutable OutputSchema _oTuple = {};
                  _oTuple.url = UrlInput.url;
                  _oTuple.primaryHostName = _primaryHostName;
                  _oTuple.alternateHostNames = _alternateHostNames;
                  _oTuple.ipAddressList = _ipAddressList;
                  _oTuple.companyName = _companyName;
					
                  submit(_oTuple, IpAddressOfUrl);
               }
            }
      }
		
      // Write the results to a file using FileSink.
      () as FileWriter1 = FileSink(IpAddressOfUrl) {
         param
            file: "UrlToIpAddress-Result.csv";
      }
}

Потоковый SPL-граф, показанный в листинге 5 начинается с задания пространства имен. После этого описывается основной составной SPL-объект (composite). В разделе типов описывается два типа кортежей - для входных и для выходных аргументов этого приложения. Затем базовое выражение графа заполняется тремя Streams-операторами (доступны в стандартном SPL-инструментарии). Первый из этих операторов - FileSource - читает строки входного CSV-файла в местоположении по умолчанию (подкаталог data в SPL-проекте). Кортежи, исходящие из оператора FileSource, поступают в оператор Custom, который вызывает нативную SPL-функцию (getIpAddressFromUrl), написанную на языке C++. Как указывалось выше, этот код на C++, в свою очередь, исполняет Python-функции для получения результатов для заданного веб-адреса. Значения результатов присваиваются выходному кортежу и выдаются на выход оператора Custom. И, наконец, оператор FileSink получает выходные кортежи из оператора Custom и записывает результаты в выходной CSV-файл. Следует отметить, что код нативной функции на C++ скомпилирован в виде .so-библиотеки, как объясняется ниже.

Модель функции

Листинг 6. function.xml
<?xml version="1.0" encoding="UTF-8"?>
<functionModel xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
   xmlns="http://www.ibm.com/xmlns/prod/streams/spl/function" 
   xmlns:cmn="http://www.ibm.com/xmlns/prod/streams/spl/common"
   xsi:schemaLocation=
   "http://www.ibm.com/xmlns/prod/streams/spl/function functionModel.xsd">
  <functionSet>
    <headerFileName>StreamsToPythonWrappers.h</headerFileName>
    <cppNamespaceName>calling_python_from_streams</cppNamespaceName>
    <functions>
      <function>
        <description>Initialize the Streams to Python module</description>
        <prototype>public int32 initializeStreamsToPython()</prototype>
      </function>
      <function>
        <description>Get the IP addresses for a given URL</description>
        <prototype>public boolean getIpAddressFromUrl(rstring url, 
        mutable rstring primaryHostName, mutable rstring alternateHostNames,
        mutable rstring ipAddressList, mutable rstring companyName)</prototype>
      </function>
    </functions>
    <dependencies>
      <library>
        <cmn:description>Streams to Python Shared Library</cmn:description>
        <cmn:managedLibrary>
          <cmn:lib>StreamsToPythonLib</cmn:lib>
          <cmn:libPath>../../impl/lib</cmn:libPath>
          <cmn:includePath>../../impl/include</cmn:includePath>
          <cmn:command>../../impl/bin/archLevel</cmn:command>
        </cmn:managedLibrary>
      </library>
      <library>
        <cmn:description/>
        <cmn:managedLibrary>
          <cmn:lib>python2.6</cmn:lib>
          <cmn:libPath>/usr/lib64</cmn:libPath>
          <cmn:includePath>/usr/include/python2.6</cmn:includePath>
        </cmn:managedLibrary>
      </library>
    </dependencies>
  </functionSet>
</functionModel>

В листинге 6 представлен XML-файл для модели нативной функции. В листинге 5 для SPL-кода мы увидели, что вызов нативной функции C++ осуществляется изнутри оператора Custom. Как SPL-код узнает о местоположении этого кода на C++? XML-файл с моделью нативной функции является связующим звеном между SPL-кодом и кодом на C++. В процессе компиляции SPL-кода Streams-компилятор разрешает имя функции C++ с помощью информации, которую мы предоставляем в этом XML-файле. В начале этого XML-файла мы указываем имя include-файла обертки C++, который содержит подставляемые нативные функции, показанные в листинге 4. Затем мы указываем пространство имен C++, в котором определены нативные функции C++. Далее следует XML-сегмент, в котором мы декларируем прототип для нативных функций C++. Следует отметить, что декларации прототипа специфицируются с помощью SPL-типов, соответствующих типам данных C++. Если нативная функция C++ ожидает, что аргумент функции будет передан в виде ссылки, то этот аргумент функции должен быть декларирован в прототипе функции как mutable. Если к логике нативной функции C++ возможен доступ посредством .so-файла, то в XML-файл следует включить сегмент library (библиотека). В этом случае мы должны задать имя этой библиотеки (обычно первыми тремя буквами в имени Linux-библиотеки являются буквы lib, в данном случае при задании имени библиотеки эти три буквы следует опустить; точно так же, расширение .so не является обязательным). Необходимо задать местоположение .so-файла и местоположение include-файла для общей библиотеки. Хорошая практическая рекомендация - поместить и файл общей библиотеки, и include-файлы в каталог SPL-проекта, чтобы облегчать их распространение для различных установок Streams.

Как показано выше в таблице 2, каталог SPL-проекта имеет подкаталоги impl/lib и impl/include, которые вполне подходят для этой цели. В XML-файле для модели нативной функции эти каталоги обозначены как ../../impl/lib и как ../../impl/include (где ../../ - это относительный путь к каталогу impl, который может быть разрешен исходя из местоположения XML-файла с моделью функции). Если ваше приложение поддерживается несколькими версиями Linux®, а также 32-разрядными и 64-разрядными процессорами, необходимо предоставить различные версии библиотек в отдельных каталогах. Чтобы упростить автоматизацию этого процесса, в данном примере используется скрипт оболочки (../../impl/bin/archLevel), который автоматически выбирает корректное местоположение библиотеки, исходя из версии Linux и из разрядности процессора. Если вы прочитаете скрипт archLevel, то легко поймете, каким образом это делается. И, наконец, мы сделали раздел library, чтобы обозначить нашу зависимость от библиотеки libpython2.6.so посредством указания ее имени, ее местоположения и ее include-файлов.

Сборка примера

К данной статье прилагается полный исходный код использованного в ней примера (см. раздел Загрузка). Любое Streams-приложение может быть скомпилировано в двух режимах (в автономном режиме и в распределенном режиме). В автономном режиме весь основной composite-блок SPL-приложения компилируется в одну исполняемую программу для среды Linux. В распределенном режиме этот блок компилируется в виде распределенных компонентов, сконфигурированных для исполнения на одной или на нескольких машинах. Если вы располагаете средой тестирования, которая соответствует предварительным условиям, выполните следующие шаги для построения вышеописанного примера:

  1. Получите файл streams-to-python.zip (см. раздел Загрузка).
  2. Разархивируйте этот файл в личный каталог на вашей Linux-машине, на которой уже установлен продукт Streams.
  3. Смените текущий каталог на каталог проекта C++ ~/workspace1/StreamsToPythonLib.
  4. Выполните скрипт ./mk для создания общей библиотеки.
  5. Предыдущая команда создает .so-файл и копирует его в каталог ../../impl/lib/x86_64.RHEL6, а также копирует include-файлы в каталог ../../impl/include.
  6. Смените текущий каталог на каталог SPL-проекта ~/workspace1/streams-to-python.
  7. Создайте приложение для автономного режима посредством выполнения скрипта ./build-standalone.sh.
  8. Создайте приложение для распределенного режима посредством выполнения скрипта ./build-distributed.sh.
  9. Теперь вы должны увидеть каталог ~/workspace1/streams-to-python/output, в котором присутствуют исполняемые программы для автономного режима и для распределенного режима.

Выполнение примера

Как указывалось выше, замечательная особенность Streams состоит в том, что этот продукт позволяет создавать как автономные, так и распределенные приложения без необходимости внесения изменений в исходный код. Мы воспользовались этим обстоятельством и теперь можем исполнить оба своих приложения, как описано ниже.

Автономное приложение: Эта разновидность Streams-приложения представляет собой единственную исполняемую Linux-программу, которая может функционировать без запуска и останова runtime-экземпляра Streams.

  1. Смените текущий каталог на каталог SPL-проекта ~/workspace1/streams-to-python.
  2. Запустите на исполнение скрипт ./run-standalone.sh.
  3. Перейдите к разделу "Проверка результатов" (см. ниже).

Распределенное приложение. Эта разновидность Streams-приложения, содержащего специфицированные в потоковом SPL-графе Streams-операторы, компилируется в виде нескольких PE-элементов (Processing Element). Эти PE-элементы распределяются как отдельные Linux-процессы для исполнения на нескольких процессорных ядрах или на кластере из нескольких машин. Чтобы выполнить Streams-приложение для распределенного режима, требуется запустить экземпляр Streams, представить это приложение как задание для этого экземпляра, собрать результаты и остановить экземпляр Streams.

  1. Убедитесь в том, что вы уже создали экземпляр Streams.
  2. Смените текущий каталог на каталог SPL-проекта ~/workspace1/streams-to-python.
  3. Выполните этот скрипт с аргументом командной строки: ./run-distributed.sh -i YOUR_STREAMS_INSTANCE_NAME.
  4. На предыдущем шаге необходимо передать скрипту в качестве аргумента имя вашего экземпляра Streams.
  5. Наше приложение является весьма простым, поэтому оно быстро закончит свою работу. Подождите примерно 60 секунд.
  6. Теперь можно остановить экземпляр Streams, выполнив следующий скрипт: ./stop-streams-instance.sh -i YOUR_STREAMS_INSTANCE_NAME.
  7. Перейдите к разделу "Проверка результатов" (см. ниже).

Проверка результатов: Независимо от того, какое приложение вы выполняли - автономное или распределенное, логика нашей SPL-программы читает веб-адреса из входного CSV-файла (data/UrlInput.csv) по одной строке за один раз. Эта логика вызывает нативную функцию C++ для получения сетевой информации о данном веб-адресе, а затем записывает результаты в выходной CSV-файл (data/UrlToIpAddress-Result.csv). Во входном CSV-файле этого примера уже хранятся следующие веб-адреса.

  • www.ibm.com
  • www.stanford.edu
  • www.cnn.com
  • www.ieee.org
  • www.facebook.com
  • www.yahoo.com

Если наше автономное или распределенное приложение работало корректно, то вы увидите результаты в файле data/UrlToIpAddress-Result.csv. Полученные вами результаты должны выглядеть примерно так же, как поставляемые вместе с этим примером результаты тестового прогона, сделанного в процессе написания этой статьи (data/Expected-UrlToIpAddress-Result-Feb2013.csv). Ожидаемые результаты показаны ниже. Результат для заданного веб-адреса содержит пять разделенных запятыми полей в следующем формате: веб-адрес, основное имя хоста, альтернативные имена хоста, IP-адреса, название компании.

"www.ibm.com","www-int.ibm.com.cs186.net","['www.ibm.com']","['129.42.58.158']","ibm"
"www.stanford.edu","www-v6.stanford.edu","['www.stanford.edu']","['171.67.215.200']","stanford"
"www.cnn.com","cnn-lax-tmp.gslb.vgtf.net","['www.cnn.com' 'www.cnn.com.vgtf.net']","['157.166.240.11' '157.166.240.13' '157.166.241.10' '157.166.241.11']","cnn"
"www.ieee.org","e1630.c.akamaiedge.net","['www.ieee.org' 'www.ieee.org.edgekey.net']","['72.247.70.198']","ieee"
"www.facebook.com","star.c10r.facebook.com","['www.facebook.com']","['66.220.158.27']","facebook"
"www.yahoo.com","ds-any-fp3-real.wa1.b.yahoo.com","['www.yahoo.com' 'fd-fp3.wg1.b.yahoo.com' 'ds-fp3.wg1.b.yahoo.com' 'ds-any-fp3-lfb.wa1.b.yahoo.com']","['98.139.183.24']","yahoo"

Заключение

За два последних десятилетия язык Python существенно эволюционировал. В качестве динамического языка программирования он имеет множество активных приверженцев, представляющих самые различные организации - от университетов до известных во всем мире компаний. Основными причинами успеха Python на фоне других ведущих языков программирования, таких как C++, PHP и Java, многие считают простоту его использования и повышение продуктивности программиста.

IBM InfoSphere Streams - это лучшая на сегодняшнем рынке платформа обработки событий, предоставляющая превосходные возможности для анализа больших данных. Она включает мощную, гибкую и расширяемую модель программирования на основе собственного языка SPL (Streams Processing Language), который предоставляет готовые к применению средства для интеграции с бизнес-логикой, написанной на языках C++ и Java.

Данная статья посвящена объединению наилучших возможностей двух миров - SPL и Python. В статье был представлен способ беспрепятственного встраивания аналитического кода, написанного на Python, в Streams-приложения с целью использования мощных возможностей этого продукта в таких аспектах, как масштабирование и распределенная обработка. В дополнение к сведениям по интеграции Streams и Python в статье были описаны механизмы, задействованные в вызове любых произвольных методов классе C++ непосредственно из SPL-кода.

И, наконец, было показано, как осуществить циклическую цепочку вызовов между тремя языками (SPL<-->C++<-->Python). Статья подтвердила корректность описываемых концепций с помощью полностью работоспособного учебного кода (см. раздел Загрузка). Этот пример кода можно использовать как автономное Linux-приложение или как распределенное Streams-приложение.

Загрузка

SPL, C++, and Python

streams-to-python.zip  26 КБ


Страница сайта http://test.interface.ru
Оригинал находится по адресу http://test.interface.ru/home.asp?artId=35631