Thread-safe структуры данных .NET 4 (ч.2)Источник: thevista
BlockingCollection<T> - эта потокобезопасная коллекция называется блокирующей, поскольку действует по следующему принципу:
Разумеется, это далеко не полное описание её возможностей. Давайте с ними ознакомимся. Начнём с базовых вещей. Изменение содержимого коллекции производится при помощи специальных методов:
Как видно, для добавления и извлечения элементов есть в т.ч. и неблокирующие методы (те, что начинаются с префикса"Try"). Т.е. возможен неблокирующий доступ к коллекции. При этом Try* метод вернет false, если элемент невозможно добавить или извлечь, и true в противном случае. Для обоих типов методов есть перегруженные версии с маркером отмены, с помощью которого можно асинхронно прекратить ожидание. BlockingCollection<T> имеет механизм "завершения". Коллекция считается "завершенной", когда известно, что данные в неё добавляться больше не будут. После "завершения" все попытки добавить данные приведут к генерации исключения InvalidOperationException. Если попытаться извлечь данные из пустой "завершенной" коллекции, то также будет сгенерировано исключение. Сразу же после создания коллекция является "незавершенной", а её "завершение" выполняется посредством вызова метода CompleteAdding(). Таким образом, изменение состояния коллекции выполняется вручную. Зачем был придуман такой механизм? С его помощью обеспечивается синхронизация работы производителя и потребителя, т.е. участков кода, добавляющих и извлекающих данные из коллекции. Производитель может известить о том, что новые элементы он добавлять больше не будет. При этом потребитель будет знать, что не стоит ожидать пополнения коллекции. Ниже приведен пример, демонстрирующий подобный подход. Давайте рассмотрим немного кода, работающего с BlockingCollection<T>. Сначала создадим экземпляр коллекции, причем в конструкторе укажем её максимальную ёмкость - в данном случае это 10 элементов. Впрочем, размер можно явно не задавать, вызвав конструктор без параметров - в таком случае коллекция будет расти "неограниченно":
Код:
// создаём коллекцию
BlockingCollection<int> collection = new BlockingCollection<int>(10); Далее в коллекцию добавим несколько элементов, и на консоль выведем свойства коллекции (о которых чуть позже):
Код:
collection.Add(200);collection.Add(300);collection.Add(400);collection.Add(500); Console.WriteLine("Count: " + collection.Count);Console.WriteLine("BoundedCapacity: " + collection.BoundedCapacity);Console.WriteLine("IsCompleted: " + collection.IsCompleted);Console.WriteLine("IsAddingCompleted: " + collection.IsAddingCompleted);Console.WriteLine();
После этого запустим таймер, в делегате callback которого происходит добавление элементов и "завершение" коллекции:
Код:
Timer timer = new Timer(delegate
{ Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine(DateTime.Now.ToLongTimeString() + " Добавление " + " 600"); collection.Add(600); Thread.SpinWait(300000000); Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine(DateTime.Now.ToLongTimeString() + " Добавление " + " 700"); collection.Add(700); collection.CompleteAdding(); }, null, 3000, Timeout.Infinite); Делегат, указанный при создании таймера, будет вызван в потоке, отличном от того, в котором работает метод Main() - это позволит промоделировать доступ к коллекции со стороны нескольких потоков. Дальше в цикле начинаем перебор элементов (чтобы отличить вывод от разных потоков, используется различный цвет текста):
Код:
foreach (var item in collection.GetConsumingEnumerable())
{ Console.ForegroundColor = ConsoleColor.Gray; Console.WriteLine(DateTime.Now.ToLongTimeString() + " Получение " + item); } Затем снова выведем свойства коллекции на консоль. Вот скриншот с результатами работы программы:
Работает этот пример следующим образом. Создаётся коллекция и в неё помещается 4 элемента. После чего запускается таймер, и начинают одновременно работают 2 потока:
Те четыре элемента, которые были добавлены в самом начале, извлекаются за время менее 1 секунды. После этого основной поток блокируется, ожидая добавления новых элементов. Таймер запускается с задержкой в 3 секунды - и это видно на скриншоте: элемент 500 был извлечен в момент 06 сек., а элемент 600 был добавлен в 09 сек. В ту же 09 секунду цикл в основном потоке разблокируется, и выбирает только что добавленный элемент 600. В это время в потоке таймера создаётся искусственная задержка за счёт вызова SpinWait(). По её истечении в коллекцию добавляется элемент 700, который тут же извлекается в цикле основного потока. В заключении в потоке таймера происходит вызов collection.CompleteAdding(), который запрещает дальнейшее добавление элементов, т.е. "завершает" коллекцию. Поскольку к тому моменту коллекция пуста, и ожидание новых элементов становится бессмысленным, в основном потоке блокировка снимается и происходит выход из цикла foreach. Если закомментировать строчку с вызовом метода CompleteAdding(), то основной поток программы блокируется навсегда, ведь он будет ожидать поступления новых элементов, а поток таймера тем временем завершит работу. Т.о. выполнение "завершения" позволяет синхронизировать два параллельно работающих потока, и предотвратить подобное развитие событий. Что касается свойств коллекции, то здесь наиболее интересны следующие:
Ещё раз взглянув на скриншот, можно заметить, что в начале, когда в коллекцию добавили несколько элементов, оба логических свойства равны false, что вполне ожидаемо, ведь коллекция не "завершена". В конце работы, напротив, оба они равны true, поскольку в потоке таймера мы объявили коллекцию "завершенной" (т.е. IsAddingCompleted = true), а в основном потоке выбрали из неё все элементы (т.е. IsCompleted = true). Возникает законный вопрос - а что с порядком элементов? Почему элементы извлекаются по принципу FIFO - это что, принцип работы коллекции? Или можно как-то на это повлиять? Ответ на эти вопросы даст более подробное рассмотрение перегруженного конструктора коллекции. В одной из его модификаций есть возможность указать ссылку на хранилище элементов - экземпляр класса, реализующего IProducerConsumerCollection<T>. Среди имеющихся в арсенале .NET 4 beta 2 средств это следующие структуры данных:
Т.е. структура данных BlockingCollection<T> реализует не хранилище, а только способ доступа с блокировкой и "завершением". Причем по умолчанию для хранения элементов используется очередь ConcurrentQueue<T>, отсюда и соблюдение принципа FIFO при работе с элементами. В рассмотренном выше примере мы можем заменить вызов конструктора следующим:
Код:
BlockingCollection<int> collection =
new BlockingCollection<int>(new ConcurrentStack<int>(), 10); т.е. использовать стек. Тогда получим такой результат:
Очевидно, что порядок извлечения элементов поменялся, и теперь он соответствует LIFO, что характерно для стека. Т.о. есть возможность реализовать хранилище данных, наиболее подходящее в данном конкретном случае, и использовать его в BlockingCollection<T>. Для этого достаточно наследовать интерфейс IProducerConsumerCollection<T>:
Код:
public interface IProducerConsumerCollection<T> : IEnumerable<T>, ICollection, IEnumerable
{ void CopyTo(T[] array, int index); T[] ToArray(); bool TryAdd(T item); bool TryTake(out T item); } В заключение хочу отметить несколько любопытных статических методов класса BlockingCollection<T>. С их помощью можно добавлять или извлекать элементы, взаимодействуя с любой из коллекций, переданных в качестве параметра. Традиционно, есть 2 версии этих методов:
Как видим, они принимают массив коллекций, и параметр - добавляемый элемент или out-параметр - извлекаемый. Возвращают эти методы индекс коллекции в массиве, в которую добавили данные, или, соответственно, извлекли. При этом блокирующие методы ждут появления свободного места в коллекции или нового элемента, блокируя при этом поток, в котором выполняются. Неблокирующие, в зависимости от вызванной перегруженной версии, сразу возвращают управление, либо "пытают счастья" указанный таймаут. Разумеется, в списке выше приведены не все сигнатуры методов - есть, например, принимающие маркер отмены, CancellationToken, подробнее лучше посмотреть MSDN. Комментарии к 1-й части этого обзора побудили меня внимательнее отнестись к рассмотрению возвращаемых методами значений, и не зря. Изучая статические методы, я подумал - когда, например, TakeAnyFrom() вернёт -1? И, честно говоря, так и не смог придумать сценария. Если все коллекции массива-параметра пусты - он будет вечно ждать их пополнения или "завершения". Однако, если хотя бы одну из них завершить, получим ArgumentException. Если же положить в хотя бы одну коллекцию данные, метод вернёт её индекс и в out-параметре те самые данные. Рефлектор подсказал, что на самом деле все статические методы BlockingCollection<T> вызывают внутри один и тот же метод. Но я так и не понял, как при задании бесконечного таймаута неблокирующий метод может вернуть -1, поэтому переадресовал вопрос на форум MSDN. Для тех, кто заинтересовался, топик здесь, а вот тут соответствующая тема на MS Connect. Пока сошлись на том, что это просто ошибка в документации - копипаст с описания метода TryTakeAnyFrom(). Другие примеры использования BlockingCollection<T> можно найти на сайте MSDN Code Gallery. Там имеется страничка с исходными кодами, на которых демонстрируется использование тех или иных средств распараллеливания кода, входящих в .NET 4. На момент публикации примеры есть только для .NET 4 beta 1, но скоро опубликуют комплект и для beta 2. В составе этих примеров есть, например, набор методов-расширений, позволяющий "преобразовать" данную коллекцию в объект типа IProducerConsumerCollection<T>. Хочу обратить внимание, что наиболее актуальная реализация рассматриваемой коллекции находится в составе .NET 4 beta 2, который стал доступен на днях. Если возникнет желание опробовать её в деле, рекомендую установить Visual Studio 2010 beta 2. По сравнению с прошлогодней июньской CTP-версией, работающей под .NET 3.5, некоторые методы были переименованы. Впрочем, это касается всей библиотеки Parallel Extensions. Полное описание BlockingCollection<T> можно найти в соответствующем разделе MSDN. |