Переведено БНТП по заказу Interface Ltd.
В статье рассматриваются вопросы, связанные с работой менеджера ресурсов (Resource Manager) базы данных RDM Server.. Содержится объяснение специальных функций Resource Manager, категорий API (таких как, функции управления потоками и функции организации очередей) и предоставляет подробный пример, иллюстрирующий использование конкретных функций Resource Manager. Предназначена программистам, потенциально заинтересованным в приобретении базы данных RDM Server.
Разработка многопотоковых приложений предполагает использование функций операционных систем для управления потоками и обеспечения межпотоковой синхронизации и коммуникации. Многие современные операционные системы поддерживают такие функции, но поскольку доступ к ним зависит от конкретной платформы, нередко приходится сталкиваться со значительными трудностями при создании сложных, платформенно-независимых многопотоковых прикладных программ. Если разрабатываемое вами приложение не является платформенно-независимым, вы можете свободно применять соответствующие функции вашей операционной системы. Однако, если вы создаете приложение, масштабируемое для большого числа различных платформ, вам следует воспользоваться интерфейсом Resource Manager (RM) базы данных RDM Server. Многопотоковое ядро RDM Server использует все функции интерфейса RM. Это гарантирует их надежность и доступность для всех платформ, поддерживаемых RDM Server.
Resource Manager предоставляет платформенно-независимый интерфейс с большим количеством функций, позволяющих решать такие сложные задачи для операционных систем, как:
Все функции RM предназначены для работы в однопроцессной среде многопотоковых приложений и не работают для многопроцессных приложений. Они могут использоваться в дополнительных модулях RDM Server или специализированных серверах базы данных с ядром RDM Server.
Полный список функций интерфейса RM приводится в следующей таблице.
Таблица 1 – Список функций Resource Manager
Функция | Класс | Описание |
rm_startup | общего назначения | Запуск менеджера ресурсов. |
rm_cleanup | общего назначения | Остановка и очистка менеджера ресурсов. |
rm_sleep | общего назначения | Приостановка потока на указанное число миллисекунд. |
rm_log | общего назначения | Запись сообщения в журнал сообщений RDM Server. |
rm_getenv | общего назначения | Получение строки описания среды операционной системы. |
rm_interrupt | общего назначения | Разрешение обработки прерываний работы пользователей. |
rm_threadBegin | управления потоками | Начало выполнения нового потока. |
rm_threadEnd | управления потоками | Завершение выполнения потока. |
rm_syncCreate | синхронизации | Создание объекта синхронизации. |
rm_syncDelete | синхронизации | Удаление объекта синхронизации. |
rm_syncEnterExcl | синхронизации | Вход в участок программы с монопольным доступом (семафор взаимного исключения). |
rm_syncExitExcl | синхронизации | Выход из участка программы с монопольным доступом (семафор взаимного исключения). |
rm_syncStart | синхронизации | Запуск события (установка семафора событий в несигнальное (занятое) состояние). |
rm_syncResume | синхронизации | Возобновление события (семафор событий в сигнальном состоянии). |
rm_syncWait | синхронизации | Ожидание события, которое должно возобновится (сигнальное состояние). |
rm_syncEnableQ | синхронизации | Включение семафора взаимного ожидания (допуск считывателей к общим данным). |
rm_syncDisableQ | синхронизации | Выключение семафора взаимного ожидания (блокировка считывателей). |
rm_syncSendQ | синхронизации | Отправка уведомления о завершении считывания на семафор взаимного ожидания. |
rm_syncReceiveQ | синхронизации | Получение уведомления о разрешении считывания на семафоре взаимного ожидания. |
rm_syncWaitQ | синхронизации | Ожидание на семафоре взаимного ожидания окончания работы всех считывателей. |
rm_queueCreate | организации очередей | Создание очереди сообщений. |
rm_queueDelete | организации очередей | Удаление очереди сообщений. |
rm_queuePurge | организации очередей | Удаление сообщений из очереди. |
rm_queueWrite | организации очередей | Запись сообщений в очередь. |
rm_queueRead | организации очередей | Считывание сообщений из очереди. |
rm_createTag | динамической памяти | Создание тэга динамически выделенной памяти. |
rm_resetTag | динамической памяти | Сброс буфера нелокального перехода указанного тэга. |
rm_freeTagMemory | динамической памяти | Освобождение всей памяти, выделенной для тэга. |
rm_getMemory | динамической памяти | Выделение блока памяти. |
rm_cGetMemory | динамической памяти | Выделение и очистка блока памяти. |
rm_freeMemory | динамической памяти | Освобождение выделенного блока памяти. |
rm_extendMemory | динамической памяти | Расширение (или перераспределение) блока памяти. |
rm_growMemory | динамической памяти | Увеличение блока памяти (если необходимо). |
rm_allocPool | динамической памяти | Выделение пула памяти. |
rm_getFromPool | динамической памяти | Получение буфера из пула. |
rm_putInPool | динамической памяти | Внесение буфера в пул. |
rm_zFreePool | динамической памяти | Освобождение пула памяти. |
rm_fileOpen | управления файлами | Открытие/Создание файла. |
rm_fileClose | управления файлами | Закрытие файла. |
rm_filePrintf | управления файлами | Запись в файл форматированного вывода. |
rm_fileSeek | управления файлами | Поиск абсолютной позиции в файле. |
rm_fileRead | управления файлами | Считывание из файла. |
rm_fileWrite | управления файлами | Запись в файл. |
rm_fileSeekRead | управления файлами | Считывание от заданного места в файле. |
rm_fileSeekWrite | управления файлами | Запись до заданного места в файле. |
rm_fileLength | управления файлами | Определение текущей длины файла. |
rm_fileSync | управления файлами | Сброс файла на диск. |
rm_fileRemove | управления файлами | Удаление файла (перемещение в другое место). |
rm_fileRename | управления файлами | Переименование файла. |
rm_fileTempName | управления файлами | Создание уникального временного имени файла. |
Для обеспечения платформенной независимости Resource Manager базы данных RDM Server определяет некоторые встроенные типы данных и константы. Типы данных используются для атрибутов объявления функций и определений дескрипторов; константы для кодов возврата и управляющих параметров функций. Определения дескрипторов и управляющих параметров задаются в описаниях функций. Атрибуты объявления функций приводятся ниже. Эти атрибуты указываются в объявлении функции между типом данных и именем функции. Заметим, что объявления функций используются для платформ, имеющих специальные архитектурные функции, которые не столько требуют преимущества быстродействия, сколько сами предоставляют такие преимущества при своем выполнении. Таким образом, мы инкапсулировали эти платформенно-ориентированные объявления в платформенно-независимые определения. Для многих платформ эти объявления фактически пусты.
Таблица 2 – Атрибуты функций Resource Manager
Атрибут | Описание |
REXTERNAL | Задает общедоступную функцию в общей библиотеке (DLL). |
RINTERNAL | Задает частную функцию пользователя, вызываемую из отдельной общей библиотеки (DLL). |
REXTVARARG | Задает функцию формирования списков переменных аргументов. |
RTHREAD | Задает функцию управления потоками, которая будет вызвана функцией rm_threadBegin. |
Список кодов состояния, возвращаемых функциями RM, приводится в следующей таблице. Они объявлены как тип short. Заметим, что некоторые функции RM возвращают значение -1 при неудачном выполнении и значение 0 (RM_OKAY) при успешном выполнении.
Таблица 3 – Коды возврата Resource Manager
Код возврата | Описание |
RM_OKAY | Функция успешно выполнена. |
RM_TIMEOUT | Операция не может завершиться до истечения указанного времени ожидания. |
RM_QUEUE_EMPTY | Нет сообщений в очереди. |
RM_NOCATPATH | Указан неверный путь к каталогу. Необходимо указать полное имя директории, содержащей каталог RDM Server. Возвращается при выполнении функции rm_startup. |
RM_INVCATALOG | Недействительный каталог RDM Server. Либо неправильно указано полное имя файла, либо требуемый файл не найден в указанном каталоге. Возвращается при выполнении функции rm_startup. |
Интерфейс RM API предоставляет ряд функций общего назначения. Функции rm_startup и rm_cleanup являются альтернативами функциям s_startup и s_terminate. Если вы собираетесь использовать только функции интерфейса RM API, игнорируя функции базы данных RDM Server (т.е. s_ , d_ или функции SQL), то следует иметь в виду, что вызов функции rm_startup вместо s_startup требует меньше памяти и выполняется более быстро. Это объясняется тем, что s_startup запускает управляющие потоки базы данных RDM Server. В то время как функция rm_startup инициализирует только подсистему базы данных Resource Manager.
Функция rm_getenv извлекает стандартные строки, описывающие среду операционной системы. Функция rm_interrupt предоставляет доступ к обработке прерываний работы пользователей. Функция rm_sleep позволяет приостановить поток на указанное число миллисекунд.
Поток представляет собой функцию, которая может быть выполнена независимо из других потоков одного и того же родительского процесса. Любой поток родительского процесса использует вместе с другими потоками адресное пространство, глобальные переменные и ресурсы этого процесса. Каждый процесс порождается одним потоком. Если основной поток родительского процесса вызывает какую-либо функцию управления потоками, то эта функция выполняется параллельно с основным потоком. Теоретически может быть запущено любое число потоков, но на практике некоторые платформы операционных систем ограничивают количество выполняемых потоков. Слишком большое число потоков может отрицательно сказаться на быстродействии операционной системы из-за потерь, которые она несет при управлении и планировании потоков. Впрочем, эти потери могут значительно различаться как для систем, базирующихся на архитектуре компьютера, так и для конкретной операционной системы. Потоки дают значительный выигрыш в быстродействии, если они выполняются в системах с симметричной мультипроцессорной архитектурой (SMP), использующих более двух центральных процессоров.
С помощью интерфейса RM API базы данных RDM Server вы можете вызвать функцию rm_threadBegin, которая инициализирует выполнение потоков. Поток состоит из стандартной функции языка C типа void, объявленной с атрибутом RTHREAD. Вызывая rm_threadBegin, вы можете передать параметр указателя в эту функцию. Поток выполняется до тех пор, пока он не закончит работу или не вызовет функцию rm_threadEnd.
Каждый поток имеет локальную память, которая выделяется из стека процесса. Назначаемый потоку размер пространства стека задается параметром функции rm_threadBegin. Используя константу THREAD_STACK, можно задать системное значение по умолчанию.
Все потоки выполняются на одном из трех приоритетных уровней. Как правило, потокам сеансов базы данных RDM Server (т.е., потокам, выполняющим функцию s_login ) должен назначаться приоритет RM_THREAD_HIGH, как и тем потокам, для выполнения которых требуется приоритет, не меньший чем у системных потоков RDM Server. Частое использование приоритета RM_THREAD_HIGH может отрицательно сказаться на системной производительности базы данных RDM Server.
Несколько потоков не могут безопасно манипулировать глобальными переменными одновременно. По крайней мере, один из двух потоков, которые пытаются одновременно обновить одну и ту же глобальную переменную, не сможет этого сделать. В худшем случае эта ситуация может привести к фатальному сбою для некоторых систем. Чтобы обеспечить безопасное обновление глобальных переменных несколькими потоками, необходимо запустить один из трех видов синхронизации. Синхронизация взаимного исключения упорядочивает обновления, выполняемые разными потоками. Это означает, что эти обновления будут выполняться одно за другим в порядке поступления запросов. Функции rm_syncEnterExcl и rm_syncExitExcl интерфейса RM обеспечивают выполнение синхронизации этого типа посредством семафора взаимного исключения (mutex semaphore). Для защиты глобальных данных можно создавать любое количество семафоров взаимного исключения. Заметим, что доступ "только для чтения " к любой глобальной переменной безопасен и не требует упорядочивания.
Два потока, обновляющих одни и те же общие данные, в некоторых случаях можно синхронизировать с помощью семафоров событий. При использовании семафора событий один из двух потоков получает доступ к общим данным, обновляет их и сигнализирует другому потоку, находящемуся в состоянии ожидания, о том, что он закончил свою работу. Для поддержки семафора событий используются функции rm_syncStart, rm_syncResume и rm_syncWait.
Для выполнения синхронизации между несколькими считывателями и одним редактором, которые обращаются к общим данным, используется семафор взаимного ожидания. Этот семафор, с одной стороны, не допускает считывателей к общим данным во время их обновления редактором, а с другой, не позволяет редактору выполнять обновление общих данных до тех пор, пока все считыватели не прочитают эти данные.
Например, RDM Server использует семафоры взаимного ожидания для управления процессом контрольных точек. (Функции модификации кэша базы данных находятся в ожидании во время выполнения контрольной точки, и наоборот, контрольная точка не выполняется, пока не завершатся все запущенные функции модификации кэша.) Доступ к семафорам взаимного ожидания осуществляется с помощью функций rm_syncEnableQ, rm_syncDisableQ, rm_syncSendQ, rm_syncReceiveQ и rm_syncWaitQ.
Вызывая функцию rm_syncCreate, можно создавать любые перечисленные выше семафоры. Семафоры освобождаются при вызове функции rm_syncDelete.
Часто возникают ситуации, когда требуется отправить в поток необходимые данные. Функции организации очередей, предусмотренные в интерфейсе RM API, предоставляют эту возможность. Все потоки могут отправлять сообщения в другие потоки, независимо от их количества. Каждая очередь сообщений имеет собственное имя и доступна через дескриптор этой очереди. Функция rm_queueCreate присваивает очередям соответствующие дескрипторы.
Вызвав функцию rm_queueWrite, вы можете поставить сообщение в очередь. Сообщение состоит из указателя, размера блока, на который указывает этот указатель, и идентификатора сообщения (типа long). Простой механизм назначения приоритетов позволяет задавать сообщение, которое должно быть поставлено в начало очереди. Обычно сообщения в очереди обрабатываются в порядке поступления. Поток может прочитать следующее доступное в очереди сообщение при вызове функции rm_queueRead. Эта функция позволяет указать, сколько времени поток должен ожидать сообщения, которое должно быть отправлено, если очередь пуста. По истечении времени ожидания функция возвратит состояние RM_TIMEOUT. Если вы задали для времени ожидания значение RM_INDEFINITE_WAIT, поток будет ожидать до тех пор, пока следующее по времени сообщение не будет поставлено в очередь.
Resource Manager базы данных RDM Server содержит набор функций распределения тегированной памяти, которые позволяют распределять память, связанную с тэгами. Для создания тэга памяти вызовите функцию rm_createTag. При вызове этой функции вы можете предусмотреть буфер нелокального перехода (jump buffer) для функции setjmp. Если для созданного тэга окажется недостаточно доступной памяти, управление будет передано в этот буфер. Обычно в этом случае из функции верхнего уровня вызывается стандартная функция setjmp языка C (когда вам требуется передать управление этой функции, сохраняя состояние стека). При использовании буфера нелокального перехода вам не требуется проверять каждый вызов функций rm_getMemory, rm_cGetMemory, rm_extendMemory или rm_growMemory на значение NULL. Вся связанная с тэгом память может быть освобождена вызовом одной функции rm_freeTagMemory.
Можно также указать адрес функции, к которой будут обращаться функции распределения памяти интерфейса RM в случае недостатка выделяемой памяти. Эта функция, написанная вами, должна освобождать достаточное количество памяти для удовлетворения запроса. Вы можете использовать эту функцию обратного вызова для выполнения процедуры "сборки мусора".
Кроме того, существует интерфейс пула памяти, который допускает многократное использование буферов памяти с целью сокращения потерь, связанных с большим количеством процедур динамического распределения и освобождения памяти.
Некоторые операционные системы устанавливают предел на максимальное число файлов, которые процесс может одновременно открыть. Функции управления файлами, предусмотренные интерфейсом RM, отслеживают этот предел, чтобы динамически открывать и закрывать файлы операционной системы, не выходя за его рамки.
Интерфейс RM также предоставляет возможность выполнять асинхронный ввод-вывод, позволяя нескольким потокам одновременно считывать или записывать информацию в один и тот же файл. Кроме того, асинхронная запись в файл позволяет возвращать вызов записи до того, как эта запись будет завершена.
В этом разделе приводится пример программы "fastdump", которая иллюстрирует использование большинства функций Resource Manager. Это программа представляет собой утилиту многопоточного шестнадцатеричного дампа, в которой используются только функции интерфейса Resource Manager API, т.е. в ней не выполняются обращения к базе данных RDM Server. Программа запускает до 9 потоков, отвечающих за выполнение дампа. Поток основного процесса формирует очередь для отправки сообщений, запрашивающих дамп каждого блока из входного файла в форматированный выходной файл. Каждый поток считывает следующее по очереди сообщение, затем асинхронно считывает соответствующий блок из входного файла, форматирует выходной блок и асинхронно записывает этот отформатированный блок в выходной файл. После отправки сообщения, которое запрашивает дамп последнего блока, поток основного процесса посылает каждому потоку сообщение о завершении дампа. По прочтении этого сообщения поток останавливается и сигнализирует о своем завершении семафору событий, который информирует об этом поток основного процесса. После того, как все потоки отправят сигнал о своем завершении, программа прекращает работу.
Программа "fastdump" может быть выполнена с помощью следующей командной строки:
fastdump [-b размер_блока] [-t число_потоков] входной_файл выходной_файл
-b размер_блока Задает размер блока входного файла, дамп которого выполняется всеми потоками.
Если размер блока не кратен 512, то он будет усечен до размера кратного 512.
Значение по умолчанию – 4096.
-t число_потоков Задает число потоков, выполняющих дамп. Максимальное число потоков 9, по умолчанию 3 потока.
Ниже приводится начальный код основной программы "fastdump", который обрабатывает аргументы командной строки.
/* ==========================================================================
Multithreaded hex dump program
*/
void main( int argc, char *argv[] )
{
int argi;
char *inFileName, *outFileName;
short *threadSems, tno, stat;
unsigned long filelen, pos;
jmp_buf noMoreMem;
DUMPMSG *pMsg;
/* process command line options */
for (argi = 1; argi < argc && argv[argi][0] == '-'; ++argi) {
char opt = argv[argi][1];
switch ( opt ) {
ase 'b': /* set size of input file block buffer */
if ( ++argi == argc ) usage();
buffSize = (size_t)atoi(argv[argi]);
if (buffSize % 512) {
/* make sure its a multiple of 512 */
buffSize = (buffSize/512)*512;
}
break;
case 't': /* set # of dump threads (max. of 9) */
if ( ++argi == argc ) usage();
maxThreads = (short)atoi(argv[argi]);
if (maxThreads >= 10)
maxThreads = 9;
break;
default:
usage();
}
}
if ( argi < argc - 1 ) {
/* fetch file names */
inFileName = argv[argi++];
outFileName = argv[argi];
}
else
usage();
Функция usage позволяет вывести на печать информацию об использовании программы и выйти из программы. Переменные buffSize и maxThreads являются глобальными целочисленными переменными (типа size_t и short, соответственно). Следующая часть основной программы запускает Resource Manager.
/* startup RDM Server resource manager */
stat = rm_startup(NULL, MessageConsole, LOG_ERROR|LOG_WARN|LOG_INFO);
if ( stat != RM_OKAY ) {
printf("Unable to start up resource manager. Status = %d\n", stat);
exit(1);
}
Первый аргумент функции rm_startup указывает полный путь к директории каталога RDM Server. В данном случае передается значение NULL, которое позволяет извлечь путь к каталогу из переменной среды CATPATH. Директория каталога содержит файл velocis.ini, из которого Resource Manager извлекает информацию о своей конфигурации. Если переменная среды CATPATH отсутствует, то функция rm_startup возвращает код ошибки RM_NOCATPATH.
Второй аргумент функции rm_startup указывает адрес пользовательской функции журнала сообщений RDM Server. Когда он задан, база данных RDM Server вызывает указанную функцию, чтобы обработать все журнальные сообщения, которые генерируются самой базой данных RDM Server или при каждом вызове функции rm_log, исходящем из пользовательского приложения.
Третий аргумент состоит из битовой карты типов журнальных сообщений. В данном случае в нем указаны только регистрируемые сообщения типа errors (сообщения об ошибке), warnings (предупреждающие сообщения) и information (информационные сообщения). Можно также включить и другие типы сообщений, например, сообщения запуска RDM Server (LOG_STARTUP) и сообщения регистрации пользователя (LOG_LOGIN).
Теперь приведем код функции MessageConsole.
/* =========================================================================
Log console messages
*/
void REXTERNAL MessageConsole(
RDSLOG_CTRL fcn, /* тип вызова: open, close, message */
RDSLOG_CTRL fcn, /* call type: open, close, message */
short type, /* message type */
char *msg) /* message to be logged */
{
if ( fcn == RDSLOG_MESSAGE ) {
switch ( type ) {
case LOG_ERROR: printf("***ERROR: "); break;
case LOG_WARN: printf("***WARINING: "); break;
}
printf("%s\n", msg);
}
}
В этой функции нет ничего сложного. Она просто печатает сообщения об ошибке и предупреждающие сообщения на консоли сервера. Конечно, можно написать более сложные обработчики сообщений. Например, в Windows NT/2000 вы могли бы написать сообщение для журнала событий.
Следующий раздел основной программы открывает входной и выходной файлы.
/* open files */
hInFile = rm_fileOpen(inFileName, O_RDONLY|O_BINARY, RM_SHARE, 0, 0);
hOutFile = rm_fileOpen(outFileName, O_CREAT|O_RDWR|O_BINARY,
RM_SHARE|RM_DIRECTIO, 0, 0);
if ( ! hInFile || !hOutFile ) {
printf("Unable to open files\n");
rm_cleanup();
exit(1);
}
Дескрипторы файлов hInFile и hOutFile являются глобальными переменными типа RM_PFILE. Они используются всеми потоками, выполняющими дамп. Оба файла открываются как совместно используемые (RM_SHARED) двоичные (O_BINARY) файлы (не текстовые). Входной файл доступен только для чтения (O_RDONLY). Выходной файл доступен для чтения/записи (O_RDWR) (несмотря на то, что запись также применяется). Если выходного файла нет, то он создается (O_CREAT). Кроме того, выходной файл имеет флаг RM_DIRECTIO. Этот флаг указывает записи, которые должны записываться прямо на диск.
Если файлы по какой-либо причине нельзя открыть, функция rm_fileOpen возвращает значение NULL, предварительно регистрируя сообщение, указывающее на эту причину.
Следующий раздел основной программы настраивает управление динамической памятью.
/* set up memory management */
if ( setjmp(noMoreMem) ) {
rm_log(LOG_ERROR, "FATAL EXIT: out of memory");
goto shutdown;
}
/* use C malloc function to allocate a reserve threshold amount */
threshold = malloc(2*buffSize);
/* allocate memory tag, RM_USESEM => multithread allocations are serialized
*/
mTag = rm_createTag("fastdump", 0, noMoreMem, FreeReserve, 0, RM_USESEM);
/* allocate pool for queue messages */
qPool = rm_allocPool(mTag, "qPool");
Заметим, что объявления локальных переменных, которые были ранее продемонстрированы для основной программы, включают переменную noMoreMem, объявленную как тип jmp_buf (объявление исходит из #include <jmpbuf.h> под #include rds.h). Вызов стандартной функции setjmp языка С сохраняет в noMoreMem информацию о месте возвращения, передавая ее в функцию rm_createTag. Таким образом, если какой-либо вызов функции распределения памяти, связанный с mTag, не проходит из-за недостатка памяти, то будет выполняться функция longjmp для указанного jmp_buf. В результате, функция setjmp возвратит ненулевое значение с вызовом функции rm_log для сообщения о недостатке памяти, за которым последует остановка программы.
Threshold является глобальным указателем на объект неизвестного типа (global void pointer), который используется для получения зарезервированного фрагмента памяти (в данном случае, равного двум блокам). Функция FreeReserve освобождает этот фрагмент, если при запросе о выделении памяти для тега mTag не будет найдено достаточного количества памяти. После освобождения этого буфера любые последующие обращения к FreeReserve не добавляют свободной памяти, а приводят к передаче управления к функции setjmp.
Вызов функции rm_createTag содержит флаг RM_USESEM. Он необходим, поскольку позволяет упорядочить вызовы на выделение памяти для тега mTag, поступающие сразу от нескольких потоков. Если флаг RM_USESEM не будет указан, то эти вызовы могут привести (и, скорее всего, приведут) к повреждению структуры данных распределяемой памяти, что чревато нарушением общей защиты или даже бесконечным циклом. В любом случае, вам следует заранее определить, какие потоки будут выполнять извлечение и распределение памяти из каждого тега памяти. Если вы уверены, что все операции по выделению памяти для данного тега будут выполняться из одного и того же потока, то для этого флага можно указать значение RM_NOSEM. Это ускорит распределение памяти.
Если существует вероятность того, что выделение памяти для отдельного тега будет выполняться из нескольких потоков, следует использовать флаг RM_USESEM (т.е. указать на необходимость семафора взаимного исключения для защиты доступа к структурам данных распределяемой памяти для этого тега).
Большинство операций, связанных с распределением памяти, включают размещение и освобождение сообщений с запросами на дамп, которые посылаются из основного потока во все потоки, выполняющие дамп входного файла. Программа "fastdump" предусматривает использование пула памяти во избежание потерь, связанных с большим количеством операций по распределению и освобождению одинаковых по размеру блоков. Поставленные в очередь новые сообщения извлекаются из этого пула и распределяются между потоками, которые их обрабатывают. Затем, по завершении обработки, эти сообщения возвращаются обратно в пул. Вызов функции rm_allocPool выделяет дескриптор пула qPool. Первый аргумент этого дескриптора указывает на тег, в котором qPool будет выполнять операции по выделению и размещению памяти. Второй аргумент указывает имя семафора, созданного функцией rm_allocPool и используемого для упорядочивания обращений к пулу. Этот семафор имеет точно такое же назначение, что и другие описанные выше семафоры, предназначенные для многопотоковых распределений памяти. Он здесь необходим, поскольку функции пула будут вызываться сразу из нескольких потоков.
Теперь приведем код функции FreeReserve. Заметим, что аргументы этой функции включают не только тег памяти, но и запрашиваемый размер памяти.
/* =========================================================================
Insufficient memory reserve function
*/
void REXTERNAL FreeReserve(
RM_MEMTAG tag,
size_t size)
{
if ( threshold && size <= 2*buffSize ) {
/* free reserve if it has sufficient space */
free(threshold);
threshold = NULL; /* печать уведомляющего сообщения */
rm_log(LOG_WARN, "running low on memory");
}
}
После настройки управления динамической памятью основная программа создает очередь сообщений и глобальный семафор взаимного исключения, выделяет массив для семафоров завершающего события для каждого потока, и затем запускает эти потоки, чтобы выполнить дампа входного файла. Все эти операции выполняет следующий сегмент программы.
/* Create message queue for sending dump instructions to threads
*/
dumpQueue = rm_queueCreate("dumpQueue");
/* A global variable, blockCount, will be updated by the threads.
This mutex semaphore will be used to synchronize access to it.
*/
countSem = rm_syncCreate("countSem", RM_MUTEX_SEM, RM_EXCLUSIVE);
/* allocate array of event semaphores, 1 per thread */
threadSems = rm_getMemory(maxThreads*sizeof(short), mTag);
rm_log(LOG_INFO, "Launching %d dump threads", maxThreads);
for ( tno = 0; tno < maxThreads; ++tno ) {
threadSems[tno] = rm_syncCreate("doneSem", RM_EVENT_SEM, RM_EXCLUSIVE);
rm_syncStart(threadSems[tno]);
rm_threadBegin(DumpThread, THREAD_STACK, &threadSems[tno], RM_THREAD_HIGH);
}
Очередь dumpQueue используется для отправки сообщений из основного потока в потоки, выполняющие дамп. Семафор взаимного исключения countSem упорядочивает доступ к глобальной переменной blockCount типа long для ее обновления. Эта переменная увеличивается после выполнения потоками каждого запроса на дамп. Приведенный выше сегмент программы иллюстрирует способ использования семафоров взаимного исключения для защиты доступа к глобальным данным.
Массив threadSems содержит для каждого потока один семафор событий. Эти семафоры создаются циклом for, который и запускает потоки. Вызов функции rm_syncStart устанавливает семафор событий в несигнальное состояние, которое приостанавливает все последующие вызовы функции rm_syncWait для этого семафора, пока какой-либо другой поток или несколько потоков не вызовут функцию rm_syncResume. Вызов функции rm_threadBegin инициализирует выполнение указанной функции управления потоками. Второй аргумент задает размер пространства стека, выделяемого для этой функции. Константа THREAD_STACK является размером стека RDM Server по умолчанию. Стек должен вмещать аргументы и локальные переменные не только указанной функции управления потоками, но и всех функций, которые будут из нее вызываться. Если вы не определили размер стека, вам лучше всего использовать значение по умолчанию THREAD_STACK (размер этого значения определяется максимальной глубиной вызовов RDM Server). Третий аргумент функции rm_threadBegin является переменной-указателем, который передается как аргумент функции управления потоками. Вы можете задать его по своему выбору. В данном случае, мы передаем адрес семафора завершающего события. Последний аргумент функции rm_threadBegin задает приоритет потока. В данном примере мы выбрали приоритет RM_THREAD_HIGH, поскольку эта программа использует только функции Resource Manager. (Также можно выбрать значения RM_THREAD_LOW и RM_THREAD_NORMAL.) Для приложений, которые будут регистрироваться и обращаться к базе данных RDM Server, этот параметр должен устанавливать приоритет NORMAL или LOW.
После запуска потоков, отвечающих за выполнение дампа, основная программа начинает отправлять этим потокам сообщения с запросами на дамп, используя очередь dumpQueue. Следующий код выполняет разбиение дампа, создавая и настраивая сообщения с запросами на дамп, и затем вызывает функцию rm_queueWrite, организуя очередь из этих сообщений.
/* get length of file */
filelen = rm_fileLength(hInFile);
/* send dump messages to threads */
for (pos = 0; filelen > 0; pos += buffSize) {
pMsg = rm_getFromPool(sizeof(DUMPMSG), qPool);
pMsg->startpos = pos;
pMsg->blocklen = filelen >= buffSize ? buffSize : filelen;
rm_queueWrite(dumpQueue, pMsg, sizeof(DUMPMSG), 0, 0);
filelen -= buffSize;
}
Функция rm_fileLength возвращает размер входного файла в байтах, сохраняя его в локальной переменной filelen типа unsigned long (длинное целое без знака). Эта переменная используется для управления циклом for, который разбивает входной файл на сообщения с запросами на дамп, длиной buffSize (возможно, кроме последнего сообщения). Объявление типа DUMPMSG выполняется следующим образом.
typedef struct dumpmsg {
unsigned long startpos; /* byte offset from start of file */
size_t blocklen; /* length of block (= buffSize, 0 = end) */
} DUMPMSG;
Сначала из пула выделяется буфер сообщения. Затем при вызове функции rm_queueWrite это сообщение ставится в очередь dumpQueue. Три первых аргумента этой функции не требуют объяснений. Четвертый аргумент является значением типа длинное целое, которое может использоваться в качестве идентификатора сообщения. В данной программе этот аргумент используется как логический флаг, который имеет значение TRUE (или 1) после отправки последнего сообщения. Последний аргумент – булевый. Он указывает, что данное сообщение является приоритетным и, следовательно, должно быть поставлено в начало очереди. В этом примере, все сообщения имеют приоритет NORMAL и ставятся в очередь в порядке поступления.
С этого момента мы оставляем основную программу и переходим к рассмотрению функции управления потоками.
/* =========================================================================
Dump thread - dumps 1 block at a time
*/
static void RTHREAD DumpThread(void *pDoneSem)
{
short doneSem;
char *inBuff;
char *outBuff;
unsigned int len;
DUMPMSG *pMsg = NULL;
long end_of_dump = 0;
/* end-of -dump event semaphore is thread argument */
doneSem = *(short *)pDoneSem;
/* allocate input buffer for 1 dump block */
inBuff = rm_getMemory(buffSize, mTag);
/* allocate output buffer for each formatted dump line */
outBuff = rm_getMemory(77*(buffSize/16)+1, mTag);
while ( ! end_of_dump ) {
rm_queueRead( dumpQueue, &pMsg, NULL, &end_of_dump, RM_INDEFINITE_WAIT
);
if ( pMsg ) {
/* read block to be dumped */
len = rm_fileSeekRead(hInFile, pMsg->startpos, inBuff, pMsg->blocklen);
if ( len != pMsg->blocklen )
rm_log(LOG_ERROR, "error reading input file");
else
DumpBlock(inBuff, pMsg->startpos, pMsg->blocklen, outBuff);
/* update count of dumped blocks */
rm_syncEnterExcl(countSem);
++blockCount;
rm_syncExitExcl(countSem);
/* free queue message */
rm_putInPool(pMsg, qPool);
}
}
/* signal foreground that we're done */
rm_syncResume(doneSem);
/* terminate thread */
rm_threadEnd();
}
Как говорилось выше, аргумент функции управления потоками является указателем на семафор завершающего события и извлекается в переменную doneSem. Каждый поток должен иметь свой входной буфер inBuff. Вызов функции rm_getMemory выделяет определенное количество байтов buffSize в этом буфере. Для каждого входного блока форматированный вывод записывается в один выходной буфер outBuff, который затем при однократном вызове записи записывается в выходной файл. Каждая форматированная выходная строка состоит из 16 байтов дампа входного блока. Все такие строки имеют длину, равную 77 символам.
Основное тело потока выполняется внутри цикла while. Цикл завершается, когда флаг end_of_dump, возвращаемый функцией rm_queueRead, имеет значение TRUE. Аргументы функции rm_queueRead достаточно просты. Последний аргумент указывает, сколько времени (в миллисекундах) поток ожидает сообщение, которое будет записано в очередь. В данном случае поток ожидает сообщение неограниченное время (RM_INDEFINITE_WAIT).
Если аргумент ожидания имеет значение 0, то функция либо возвратит значение RM_OKAY и доставит сообщение, либо возвратит значение RM_QUEUE_EMPTY, которое указывает на отсутствие сообщений в очереди.
Возвращаемый указатель сообщения содержится в pMsg. Значение NULL для pMsg отправляется с последним сообщением (флаг end_of_dump равен 1). Вызов функции rm_fileSeekRead запускает считывание в inBuff входного блока, на который указывает последнее сообщение. Функция rm_fileSeekRead возвращает фактическое число прочитанных байтов. Если это значение (len) не равно запрошенному числу байтов, значит произошла ошибка (или вы пытаетесь выполнить считывание за меткой конца файла, что в данном случае невозможно). После успешно завершенного считывания вызывается функция DumpBlock для форматирования входного блока в выходной буфер.
Переменная blockCount представляет собой глобальную переменную типа long, которая используется для подсчета блоков с выполненными дампами. Процесс обновления этой переменной требует упорядочивания, поскольку она обновляется сразу несколькими потоками. Когда поток, обновляющий эту переменную, ждет своей очереди к семафору взаимного исключения, вызываемая им функция rm_syncEnterExcl приостанавливается. После того, как он получит управление над семафором взаимного исключения, функция rm_syncEnterExcl выполняется и поток безопасно обновляет переменную blockCount. При этом все остальные потоки, обращающиеся к функции rm_syncEnterExcl для обновления переменной countSem, ставятся в очередь. Когда управляющий поток, вызывая функцию rm_syncExitExcl, освобождает семафор взаимного исключения, управление передается следующему потоку, стоящему первым в очереди.
Освободив семафор взаимного исключения, поток вызывает функцию rm_putInPool, чтобы возвратить буфер сообщения в пул памяти.
После получения сообщения о завершении дампа из основного (высокоприоритетного) потока цикл прерывается, и поток вызывает функцию rm_syncResume, которая устанавливает семафор завершающего события для этого потока в сигнальное состояние. Обычно этот семафор остается в сигнальном состоянии до своего перезапуска, который происходит в результате вызова функции rm_syncStart. Это означает, что до тех пор пока он находится в сигнальном состоянии, любые вызовы функции rm_syncWait для этого семафора будут немедленно возвращаться.
Для завершения потока вызывается функция rm_threadEnd. Заметим, что при выполнении этой функции поток автоматически завершается. Но поскольку некоторые новые платформы RDM Server могут потребовать вызова специальной функции, завершать поток следует этим вызовом.
Теперь возвратимся к основной программе. Приведенный ниже сегмент программы иллюстрирует, как основная программа посылает потокам сообщение о завершении дампа. Не имеет значения, какому потоку поступает то или иное сообщение, поскольку после получения сообщения о завершении дампа поток не будет обрабатывать никаких дополнительных сообщений.
/* send terminate thread messages */
for (tno = 0; tno < maxThreads; ++tno)
rm_queueWrite(dumpQueue, NULL, 0, 1, 0);
/* wait for each thread to signal they're done */
for (tno = 0; tno < maxThreads; ++tno)
rm_syncWait(threadSems[tno], RM_INDEFINITE_WAIT);
rm_log(LOG_INFO, "Dump complete: %ld %d-byte blocks written to file %s.",
blockCount, buffSize, outFileName);
После отправки сообщений о завершении дампа программа выполняет цикл по всем элементам массива семафора завершающего события и вызывает rm_syncWait, ожидая завершения всех потоков. Затем, она вызывает функцию rm_log, чтобы зарегистрировать завершение дампа и сообщить общее число записанных блоков. Функция rm_log аналогична функции printf, за исключением первого аргумента, который указывает тип сообщения. Второй аргумент – спецификатор формата printf, за которым следует 0 или несколько переменных, перечисленных в этом спецификаторе. В данном случае это сообщение будет обрабатывать функция MessageConsole, которая была задана при вызове функции rm_startup.
Последняя часть программы закрывает файлы и освобождает все используемые ресурсы Resource Manager.
/* close files */
rm_fileClose(hInFile);
rm_fileClose(hOutFile);
/* delete the queue */
rm_queueDelete(dumpQueue);
/* free the message pool */
rm_zFreePool(&qPool);
/* free semaphores */
rm_syncDelete(countSem);
for (tno = 0; tno < maxThreads; ++tno)
rm_syncDelete(threadSems[tno]);
/* free allocated memory */
rm_freeTagMemory(mTag, 1);
if ( threshold )
free(threshold);
rm_cleanup();
Второй аргумент функции rm_freeTagMemory является флагом, который указывает на то, будет ли этот тег освобождаться. Поскольку тег можно многократно использовать, его освобождение не всегда требуется. Тег многократно используется, когда при однократном выполнении какой-либо функции выделяется и освобождается вся память. В этом случае, поскольку тег уже существует, требуется указать способ задания нового буфера нелокального перехода, который необходим при недостатке памяти. Это можно сделать, вызвав функцию rm_resetTag.
На этом мы завершаем рассмотрение утилиты многопоточного шестнадцатеричного дампа. Этот достаточно тривиальный пример наглядно показывает, как можно использовать большинство функции Resource Manager, которые необходимы для создания сложных прикладных программ или дополнительных модулей для сервера базы данных RDM Server.
За дополнительной информацией обращайтесь в компанию Interface Ltd.
INTERFACE Ltd. |
|