(495) 925-0049, ITShop интернет-магазин 229-0436, Учебный Центр 925-0049
  Главная страница Карта сайта Контакты
Поиск
Вход
Регистрация
Рассылки сайта
 
 
 
 
 

Thread-safe структуры данных .NET 4 (ч.2)

Источник: thevista

BlockingCollection<T> - эта потокобезопасная коллекция называется блокирующей, поскольку действует по следующему принципу:

  • Если коллекция пустая, и некоторый код пытается извлечь элемент, то поток, в котором он выполняется, блокируется до появления хотя бы одного элемента;
  • Если коллекция уже содержит максимальное число элементов, то поток, в котором пытаются добавить новый элемент, блокируется до тех пор, пока место не будет освобождено.

Разумеется, это далеко не полное описание её возможностей. Давайте с ними ознакомимся. Начнём с базовых вещей. Изменение содержимого коллекции производится при помощи специальных методов:

  Блокирующие методы Неблокирующие методы

Добавление
Add(T);
Add(T, CancellationToken);
TryAdd(T)
TryAdd(T, Int32)
TryAdd(T, TimeSpan)
TryAdd(T, Int32, CancellationToken)
Извлечение
Take()
Take(CancellationToken)
TryTake(out T)
TryTake(out T, Int32)
TryTake(out T, TimeSpan)
TryTake(out T, Int32, CancellationToken)

Как видно, для добавления и извлечения элементов есть в т.ч. и неблокирующие методы (те, что начинаются с префикса"Try"). Т.е. возможен неблокирующий доступ к коллекции. При этом Try* метод вернет false, если элемент невозможно добавить или извлечь, и true в противном случае. Для обоих типов методов есть перегруженные версии с маркером отмены, с помощью которого можно асинхронно прекратить ожидание.

BlockingCollection<T> имеет механизм "завершения". Коллекция считается "завершенной", когда известно, что данные в неё добавляться больше не будут. После "завершения" все попытки добавить данные приведут к генерации исключения InvalidOperationException. Если попытаться извлечь данные из пустой "завершенной" коллекции, то также будет сгенерировано исключение. Сразу же после создания коллекция является "незавершенной", а её "завершение" выполняется посредством вызова метода CompleteAdding(). Таким образом, изменение состояния коллекции выполняется вручную. Зачем был придуман такой механизм? С его помощью обеспечивается синхронизация работы производителя и потребителя, т.е. участков кода, добавляющих и извлекающих данные из коллекции. Производитель может известить о том, что новые элементы он добавлять больше не будет. При этом потребитель будет знать, что не стоит ожидать пополнения коллекции. Ниже приведен пример, демонстрирующий подобный подход.

Давайте рассмотрим немного кода, работающего с BlockingCollection<T>. Сначала создадим экземпляр коллекции, причем в конструкторе укажем её максимальную ёмкость - в данном случае это 10 элементов. Впрочем, размер можно явно не задавать, вызвав конструктор без параметров - в таком случае коллекция будет расти "неограниченно":

Код:
// создаём коллекцию
BlockingCollection<int> collection = new BlockingCollection<int>(10);

Далее в коллекцию добавим несколько элементов, и на консоль выведем свойства коллекции (о которых чуть позже):

Код:
collection.Add(200);collection.Add(300);collection.Add(400);collection.Add(500); Console.WriteLine("Count: " + collection.Count);Console.WriteLine("BoundedCapacity: " + collection.BoundedCapacity);Console.WriteLine("IsCompleted: " + collection.IsCompleted);Console.WriteLine("IsAddingCompleted: " + collection.IsAddingCompleted);Console.WriteLine();

После этого запустим таймер, в делегате callback которого происходит добавление элементов и "завершение" коллекции:

Код:
Timer timer = new Timer(delegate
{
    Console.ForegroundColor = ConsoleColor.Red;    Console.WriteLine(DateTime.Now.ToLongTimeString() + " Добавление " + " 600");    collection.Add(600);    Thread.SpinWait(300000000);     Console.ForegroundColor = ConsoleColor.Red;    Console.WriteLine(DateTime.Now.ToLongTimeString() + " Добавление " + " 700");    collection.Add(700);     collection.CompleteAdding();
},
  null, 3000, Timeout.Infinite);

Делегат, указанный при создании таймера, будет вызван в потоке, отличном от того, в котором работает метод Main() - это позволит промоделировать доступ к коллекции со стороны нескольких потоков. Дальше в цикле начинаем перебор элементов (чтобы отличить вывод от разных потоков, используется различный цвет текста):

Код:
foreach (var item in collection.GetConsumingEnumerable())
{
    Console.ForegroundColor = ConsoleColor.Gray;
    Console.WriteLine(DateTime.Now.ToLongTimeString() + " Получение " + item);
}

Затем снова выведем свойства коллекции на консоль. Вот скриншот с результатами работы программы:

Работает этот пример следующим образом. Создаётся коллекция и в неё помещается 4 элемента. После чего запускается таймер, и начинают одновременно работают 2 потока:

  • основной, где выполняется метод Main(). Здесь элементы изымаются из коллекции в цикле foreach;
  • поток таймера, где выполняется делегат callback таймера. Здесь элементы добавляются;

Те четыре элемента, которые были добавлены в самом начале, извлекаются за время менее 1 секунды. После этого основной поток блокируется, ожидая добавления новых элементов. Таймер запускается с задержкой в 3 секунды - и это видно на скриншоте: элемент 500 был извлечен в момент 06 сек., а элемент 600 был добавлен в 09 сек. В ту же 09 секунду цикл в основном потоке разблокируется, и выбирает только что добавленный элемент 600. В это время в потоке таймера создаётся искусственная задержка за счёт вызова SpinWait(). По её истечении в коллекцию добавляется элемент 700, который тут же извлекается в цикле основного потока. В заключении в потоке таймера происходит вызов collection.CompleteAdding(), который запрещает дальнейшее добавление элементов, т.е. "завершает" коллекцию. Поскольку к тому моменту коллекция пуста, и ожидание новых элементов становится бессмысленным, в основном потоке блокировка снимается и происходит выход из цикла foreach. Если закомментировать строчку с вызовом метода CompleteAdding(), то основной поток программы блокируется навсегда, ведь он будет ожидать поступления новых элементов, а поток таймера тем временем завершит работу. Т.о. выполнение "завершения" позволяет синхронизировать два параллельно работающих потока, и предотвратить подобное развитие событий.

Что касается свойств коллекции, то здесь наиболее интересны следующие:

  • IsAddingCompleted - возвращает true, если коллекция помечена как завершенная, т.е. дальнейшее добавление элементов запрещено, false в противном случае;
  • IsCompleted - возвращает true, если коллекция завершенная и все элементы выбраны, иначе false;
  • BoundedCapacity - указанная при создании емкость. Возвращает -1, если коллекция "безразмерная";

Ещё раз взглянув на скриншот, можно заметить, что в начале, когда в коллекцию добавили несколько элементов, оба логических свойства равны false, что вполне ожидаемо, ведь коллекция не "завершена". В конце работы, напротив, оба они равны true, поскольку в потоке таймера мы объявили коллекцию "завершенной" (т.е. IsAddingCompleted = true), а в основном потоке выбрали из неё все элементы (т.е. IsCompleted = true).

Возникает законный вопрос - а что с порядком элементов? Почему элементы извлекаются по принципу FIFO - это что, принцип работы коллекции? Или можно как-то на это повлиять? Ответ на эти вопросы даст более подробное рассмотрение перегруженного конструктора коллекции. В одной из его модификаций есть возможность указать ссылку на хранилище элементов - экземпляр класса, реализующего IProducerConsumerCollection<T>. Среди имеющихся в арсенале .NET 4 beta 2 средств это следующие структуры данных:

  • ConcurrentStack<T> - потокобезопасный стек;
  • ConcurrentQueue<T> - потокобезопасная очередь;
  • ConcurrentBag<T> - потокобезопасная неупорядоченная коллекция, допускающая дубликаты;

Т.е. структура данных BlockingCollection<T> реализует не хранилище, а только способ доступа с блокировкой и "завершением". Причем по умолчанию для хранения элементов используется очередь ConcurrentQueue<T>, отсюда и соблюдение принципа FIFO при работе с элементами. В рассмотренном выше примере мы можем заменить вызов конструктора следующим:

Код:
BlockingCollection<int> collection =
                 new BlockingCollection<int>(new ConcurrentStack<int>(), 10);

т.е. использовать стек. Тогда получим такой результат:

Очевидно, что порядок извлечения элементов поменялся, и теперь он соответствует LIFO, что характерно для стека. Т.о. есть возможность реализовать хранилище данных, наиболее подходящее в данном конкретном случае, и использовать его в BlockingCollection<T>. Для этого достаточно наследовать интерфейс IProducerConsumerCollection<T>:

Код:
public interface IProducerConsumerCollection<T> : IEnumerable<T>, ICollection, IEnumerable
{
    void CopyTo(T[] array, int index);
    T[] ToArray();
    bool TryAdd(T item);
    bool TryTake(out T item);
}

В заключение хочу отметить несколько любопытных статических методов класса BlockingCollection<T>. С их помощью можно добавлять или извлекать элементы, взаимодействуя с любой из коллекций, переданных в качестве параметра. Традиционно, есть 2 версии этих методов:

  • Блокирующие:
    int BlockingCollection<T>.AddToAny(BlockingCollection<T>[], T);
    int BlockingCollection<T>.TakeFromAny(BlockingCollection<T>[], out T);
  • Неблокирующие:
    int BlockingCollection<T>.TryAddToAny(BlockingCollection<T>[], T);
    int BlockingCollection<T>.TryTakeFromAny(BlockingCollection<T>[], out T);

Как видим, они принимают массив коллекций, и параметр - добавляемый элемент или out-параметр - извлекаемый. Возвращают эти методы индекс коллекции в массиве, в которую добавили данные, или, соответственно, извлекли. При этом блокирующие методы ждут появления свободного места в коллекции или нового элемента, блокируя при этом поток, в котором выполняются. Неблокирующие, в зависимости от вызванной перегруженной версии, сразу возвращают управление, либо "пытают счастья" указанный таймаут. Разумеется, в списке выше приведены не все сигнатуры методов - есть, например, принимающие маркер отмены, CancellationToken, подробнее лучше посмотреть MSDN.

Комментарии к 1-й части этого обзора побудили меня внимательнее отнестись к рассмотрению возвращаемых методами значений, и не зря. Изучая статические методы, я подумал - когда, например, TakeAnyFrom() вернёт -1? И, честно говоря, так и не смог придумать сценария. Если все коллекции массива-параметра пусты - он будет вечно ждать их пополнения или "завершения". Однако, если хотя бы одну из них завершить, получим ArgumentException. Если же положить в хотя бы одну коллекцию данные, метод вернёт её индекс и в out-параметре те самые данные. Рефлектор подсказал, что на самом деле все статические методы BlockingCollection<T> вызывают внутри один и тот же метод. Но я так и не понял, как при задании бесконечного таймаута неблокирующий метод может вернуть -1, поэтому переадресовал вопрос на форум MSDN. Для тех, кто заинтересовался, топик здесь, а вот тут соответствующая тема на MS Connect. Пока сошлись на том, что это просто ошибка в документации - копипаст с описания метода TryTakeAnyFrom().

Другие примеры использования BlockingCollection<T> можно найти на сайте MSDN Code Gallery. Там имеется страничка с исходными кодами, на которых демонстрируется использование тех или иных средств распараллеливания кода, входящих в .NET 4. На момент публикации примеры есть только для .NET 4 beta 1, но скоро опубликуют комплект и для beta 2. В составе этих примеров есть, например, набор методов-расширений, позволяющий "преобразовать" данную коллекцию в объект типа IProducerConsumerCollection<T>.

Хочу обратить внимание, что наиболее актуальная реализация рассматриваемой коллекции находится в составе .NET 4 beta 2, который стал доступен на днях. Если возникнет желание опробовать её в деле, рекомендую установить Visual Studio 2010 beta 2. По сравнению с прошлогодней июньской CTP-версией, работающей под .NET 3.5, некоторые методы были переименованы. Впрочем, это касается всей библиотеки Parallel Extensions. Полное описание BlockingCollection<T> можно найти в соответствующем разделе MSDN.

Ссылки по теме


 Распечатать »
 Правила публикации »
  Написать редактору 
 Рекомендовать » Дата публикации: 12.01.2010 
 

Магазин программного обеспечения   WWW.ITSHOP.RU
Microsoft 365 Apps for business (corporate)
Microsoft Office для дома и учебы 2019 (лицензия ESD)
Microsoft Office 365 Персональный 32-bit/x64. 1 ПК/MAC + 1 Планшет + 1 Телефон. Все языки. Подписка на 1 год.
Microsoft Office 365 Профессиональный Плюс. Подписка на 1 рабочее место на 1 год
Microsoft Windows Professional 10, Электронный ключ
 
Другие предложения...
 
Курсы обучения   WWW.ITSHOP.RU
 
Другие предложения...
 
Магазин сертификационных экзаменов   WWW.ITSHOP.RU
 
Другие предложения...
 
3D Принтеры | 3D Печать   WWW.ITSHOP.RU
 
Другие предложения...
 
Новости по теме
 
Рассылки Subscribe.ru
Информационные технологии: CASE, RAD, ERP, OLAP
Безопасность компьютерных сетей и защита информации
Новости ITShop.ru - ПО, книги, документация, курсы обучения
Программирование на Microsoft Access
CASE-технологии
Программирование в AutoCAD
СУБД Oracle "с нуля"
 
Статьи по теме
 
Новинки каталога Download
 
Исходники
 
Документация
 
 



    
rambler's top100 Rambler's Top100