Основы работы с потоками в Python (исходники)Источник: Realcoding
ВведениеПотоки позволяют приложениям выполнять в одно и то же время множество задач. Многопоточность ( multi-threading ) важна во множестве приложений, от примитивных серверов до современных сложных и ресурсоёмких игр, так что, естественно, многие языки программирования поддерживают позможность работы с потоками. Python тоже входит в их число.
Однако, поддержка многопоточности в Python не обходится без ограничений и последствий, как писал Гвидо ван Россум: «К несчастью, для большинства смертных программирование потоков просто Слишком Сложное, чтобы делать его правильно… Даже в Python - всякий раз, как кто-то всерьёз берётся за программирование потоков, на меня обрушиваются тонны сообщений об ошибках, причём если причиной половины из них действительно являются ошибки в интерпретаторе Python, то причина второй половины кроется в недостаточном понимании особенностей многопоточности…»
Прежде чем мы приступим к разбору кода, работающего с потоками, нам нужно рассмотреть наиболее важную вещь - глобальную блокировку интерпретатора ( global interpreter lock , GIL ) Python. Если два или более потока попытаются манипулировать одним и тем же объектом в одно и то же время, то неизбежно возникнут проблемы. Глобальная блокировка интерпретатора исправляет это. В любой момент времени действия может выполнять только один поток. Python автоматически переключается между потоками, когда в этом возникает необходимость. Использование модуля ThreadingМодуль threading предоставляет нам простой способ работы с потоками. Его класс Thread может быть унаследован ( subclassed ) для создания потока или нескольких потоков. Метод run должен содержать код, который вы желаете выполнить при выполнении потока. Звучит просто, не так ли? Вот, посмотрите:
import threading class MyThread(threading.Thread): def run(self): print 'Insert some thread stuff here.' print 'It'll be executed...yeah....' print 'There's not much to it.' Выполнить поток также просто. Всё, что нам нужно сделать, это создать экземпляр нашего класса потока, после чего вызвать его метод start: import threading class MyThread(threading.thread): def run(self): print 'You called my start method, yeah.' print 'Were you expecting something amazing?' MyThread().start() Конечно, всего один поток это не бог весть что. Как и люди, потоки через некоторое время остаются в одиночестве. Давайте создадим группу потоков: import threading theVar = 1 class MyThread(threading.Thread): def run ( self ): global theVar print 'This is thread ' + str(theVar) + ' speaking.' print 'Hello and good bye.' theVar = theVar + 1 for x in xrange ( 20 ): MyThread().start() Давайте теперь сделаем с помощью модуля threading нечто условно-полезное. Сервера часто используют потоки для работы в одно и то же время с несколькими клиентами. Давайте создадим простой, но расширяемый сервер. Когда клиент подключится к нему, сервер создаст новый поток для обслуживания этого клиента. Чтобы отправлять потоку данные клиента нам понадобится перекрыть метод __init__ класса Thread, чтобы он принимал параметры. Отныне сервер будет отправлять поток своей дорогой и ждать новых клиентов. Каждый поток будет посылать упакованный ( pickled ) объект соответствующему клиенту, после чего печатать не более десяти строк, полученных от клиента. (Упакованный объект в общем случае является объектом, уменьшенным до нескольких символов. Это полезно при сохранении объектов для последующего использования или для передачи объектов по сети). import pickle import socket import threading # We'll pickle a list of numbers: someList = [1, 2, 7, 9, 0] pickledList = pickle.dumps(someList) # Our thread class: class ClientThread(threading.Thread): # Override Thread's __init__ method to accept the parameters needed: def __init__(self, channel, details): self.channel = channel self.details = details threading.Thread.__init__(self) def run(self): print 'Received connection:', self.details[0] self.channel.send(pickledList) for x in xrange(10): print self.channel.recv(1024) self.channel.close() print 'Closed connection:', self.details[0] # Set up the server: server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(('', 2727)) server.listen(5) # Have the server serve "forever": while True: channel, details = server.accept() ClientThread(channel, details).start() Теперь нам нужно создать клиента, который будет подключаться к серверу, получать от него упакованный объект, распаковывать ( reconstructs ) объект и, наконец, посылать десять сообщений и закрывать соединение: import pickle import socket # Connect to the server: client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect(('localhost', 2727)) # Retrieve and unpickle the list object: print pickle.loads(client.recv(1024)) # Send some messages: for x in xrange(10): client.send('Hey. ' + str(x) + 'n') # Close the connection client.close() Конечно, приведённый выше клиент не в состоянии воспользоваться всеми преимуществами многопоточности нашего сервера. Клиент порождает только один поток, что делает многопоточность бессмысленной. Давайте добавим клиенту многопоточности, чтобы сделать всё более интересным. Каждый поток будет подключаться к серверу и выполнять приведённый выше код: import pickle import socket import threading # Here's our thread: class ConnectionThread(threading.Thread): def run(self): # Connect to the server: client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect (('localhost', 2727)) # Retrieve and unpickle the list object: print pickle.loads(client.recv(1024)) # Send some messages: for x in xrange(10): client.send('Hey. ' + str(x) + 'n') # Close the connection client.close() # Let's spawn a few threads: for x in xrange(5): ConnectionThread().start() Пулы потоков ( pooling threads )Важно помнить, что потоки не появляются мгновенно. Создание большого их числа может замедлить ваше приложение. Чтобы создать поток и, позднее, уничтожить его, требуется время. Потоки могут также потреблять много ценных системных ресурсов в больших приложениях. Эта проблема легко решается путём создания ограниченного числа потоков ( set number of threads ) (пула потоков) и назначения им новых задач, в общем, повторного их использования. Соединения будут приниматься и передаваться тому потоку, который раньше всех закончит работу с предыдущим клиентом.
Если вы по-прежнему не понимаете, сравните это с больницей. Скажем, у нас есть пятеро врачей. Это наши потоки. Пациенты (клиенты) приходят в больницу и, если врачи заняты, сидят в приёмном покое. Очевидно, нам нужно нечто, что сможет передавать данные клиента в наши потоки, не вызывая при этом проблем (оно должно быть «потокобезопасным»). Модуль Queue Python делает это для нас. Клиентская информация сохраняется в объекте Queue, откуда потоки извлекают её по мере надобности. Давайте переделаем наш сервер, чтобы оценить преимущества пула потоков: import pickle import Queue import socket import threading # We'll pickle a list of numbers, yet again: someList = [1, 2, 7, 9, 0] pickledList = pickle.dumps(someList) # A revised version of our thread class: class ClientThread(threading.Thread): # Note that we do not override Thread's __init__ method. # The Queue module makes this not necessary. def run(self): # Have our thread serve "forever": while True: # Get a client out of the queue client = clientPool.get() # Check if we actually have an actual client in the client variable: if client != None: print 'Received connection:', client[1][0] client[0].send(pickledList) for x in xrange(10): print client[0].recv(1024) client[0].close() print 'Closed connection:', client[1][0] # Create our Queue: clientPool = Queue.Queue(0) # Start two threads: for x in xrange(2): ClientThread().start() # Set up the server: server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(('', 2727)) server.listen(5) # Have the server serve "forever": while True: clientPool.put(server.accept()) Как вы можете увидеть, он немного сложнее нашего предыдущего сервера, но не усложнён до полной непонятности. Для проверки этого сервера, так же, как и предыдущего, можно воспользоваться клиентом из предыдущего раздела. Дополнительные хитростиРабота с потоками не заключается только в их создании и уничтожении. Модуль (может, не модуль а класс?) Thread из модуля threading содержит ещё несколько методов, которые могут вам пригодиться. Первые два предназначены для именования потоков. Метод setName присваивает потоку имя, а метод getName возвращает имя потока:
import threading class TestThread(threading.Thread): def run(self): print 'Hello, my name is', self.getName() cazaril = TestThread() cazaril.setName('Cazaril') cazaril.start() ista = TestThread() ista.setName('Ista') ista.start() TestThread().start() Ничего удивительного. Также, как вы можете видеть, у потоков есть имена, даже если вы их не задавали. Мы также можем проверить, является ли поток «живым», воспользовавшись методом isAlive. Если поток ещё не закончил выполняться, независимо от того, что происходит в его методе run, то он классифицируется как «живой»: import threading import time class TestThread(threading.Thread): def run(self): print 'Patient: Doctor, am I going to die?' class AnotherThread(TestThread): def run (self): TestThread.run(self) time.sleep(10) dying = TestThread() dying.start() if dying.isAlive(): print 'Doctor: No.' else: print 'Doctor: Next!' living = AnotherThread() living.start() if living.isAlive(): print 'Doctor: No.' else: print 'Doctor: Next!' Второй поток остаётся в живых, поскольку мы заставили его ждать, воспользовавшись методом sleep модуля time. Если нам нужно, чтобы поток дождался завершения другого потока, можно воспользоваться методом join: import threading import time class ThreadOne(threading.Thread): def run(self): print 'Thread', self.getName(), 'started.' time.sleep ( 5 ) print 'Thread', self.getName(), 'ended.' class ThreadTwo(threading.Thread): def run(self): print 'Thread', self.getName(), 'started.' thingOne.join() print 'Thread', self.getName(), 'ended.' thingOne = ThreadOne() thingOne.start() thingTwo = ThreadTwo() thingTwo.start() Мы также можем использовать метод setDaemon. Если при вызове в него передаётся значение True и другие потоки завершили своё исполнение, то из основной программы будет произведён выход, а поток продолжит работу: import threading import time class DaemonThread(threading.Thread): def run(self): self.setDaemon(True) time.sleep(10) DaemonThread().start() print 'Leaving.' Python также содержит модуль thread, работающий на более низком уровне, чем threading. Хочу обратить ваше внимание на одну особенность: это содержащаяся в нём функция start_new_thread. Используя её мы можем превратить обычную функцию в поток: import thread def thread(stuff): print "I'm a real boy!" print stuff thread.start_new_thread(thread, ('Argument')) ЗаключениеО многопоточности можно рассказать значительно больше, чем я сделал в этой статье, но я не буду пытаться объять необъятное. Кроме того, как упомянул Гвидо ван Россум, преимущества, которые даёт сложная многопоточность в Python могут быть сведены на нет последствиями. Однако, небольшая доза здравого смысла может устранить большинство проблем в простой многопоточности.
Многопоточность очень важна, когда дело касается компьютерных приложений и, как я упоминал раньше, Python её поддерживает. При условии правильного использования, эффект от применения потоков может быть очень благотворным и часто даже критическим, как я подчеркивал в этой статье. |