2010年1月7日 星期四

Threading in Python

 
對現今的應用程式來說,
multi-thread 是不可或缺的部份。
那麼在 python 中要怎麼 implement multi-thread 呢?

這裡介紹兩個 python 所提供的 module -- thread and threading

thread module


使用 thread module 很像在 win32 的 CreateThread,
只是 CreateThread 換成了 thread.start_new_thread(function, args[, kwargs])
第一個參數就是 thread function,第二個參數是我們要傳進去的資料(tuples),
我們看一個簡單的例子:

#-*- coding: utf-8 -*-
#!/usr/bin/python

import thread
import time

def Threadfun(string, sleeptime, *args):
    while(True):
        print '{0}_{1}\n'.format(string, sleeptime)
        time.sleep(sleeptime)

if __name__ == "__main__":
    for i in range(1,5):
        thread.start_new_thread(Threadfun, ("ThreadFun", i))
    while(True):
        print 'MainThread {0}'.format(thread.get_ident())
        time.sleep(1)


以上就是很基本的建立多個執行緒的方法。

如果需要同步呢?
我們可以用 Lock Object。
Lock Object 可以藉由呼叫 allocate_lock() 得到。
先看看沒有同步會是什麼情況。

#-*- coding: utf-8 -*-
#!/usr/bin/python

import thread
import time

def Threadfun(string, sleeptime, lock, *args):
    while(True):
        print 'Enter_{0}\r\n'.format(string)
        time.sleep(sleeptime)
        print 'Leave_{0}\r\n'.format(string)

if __name__ == "__main__":
    lock = thread.allocate_lock()
    thread.start_new_thread(Threadfun, ("ThreadFun1", 2, lock))
    thread.start_new_thread(Threadfun, ("ThreadFun2", 2, lock))

    while (True):
        pass


執行的結果如下:













看出來了嗎?一個都還沒離開,另一個就進去了。

使用 Lock Object 來保護:
#-*- coding: utf-8 -*-
#!/usr/bin/python

import thread
import time

def Threadfun(string, sleeptime, lock, *args):
    while(True):
        lock.acquire()
        print 'Enter_{0}\r\n'.format(string)
        time.sleep(sleeptime)
        print 'Leave_{0}\r\n'.format(string)
        lock.release()

if __name__ == "__main__":
    lock = thread.allocate_lock()
    thread.start_new_thread(Threadfun, ("ThreadFun1", 2, lock))
    thread.start_new_thread(Threadfun, ("ThreadFun2", 2, lock))

    while (True):
        pass


執行結果如下:













這樣才是我們要的結果喔!
p.s. 根據 python 官方文件,thread module在 python 3.0 已經改為 __thread
可是還是建議使用高階的 threading module

threading module

threading module 裡面包含了以下 components
  • Lock object
  • RLock object
  • Semaphore Object
  • Condition Object
  • Event Object
  • Thread Object
threading.Thread

Thread Object wrapper 了 start_new_thread() 這個 function。

當我們呼叫 start() 時,就會自動去呼叫 run() ,
所以我們只能夠 override __init()__ 以及 run() 這兩個 method,
絕對不可以 override start()。

#-*- coding: utf-8 -*-
#!/usr/bin/python

from threading import Thread
import time

class MyThread(Thread):
    def __init__(self, string, sleeptime):
        Thread.__init__(self)
        self.sleeptime = sleeptime
        self.setName(str(sleeptime))
    def run(self):
        while(True):
            print 'Threadfun_{0}\r\n'.format(self.getName())
            time.sleep(self.sleeptime)
if __name__ == "__main__":
    thrList = [MyThread('ThreadFun', i) for i in range(1,5)]

    # thrList[0]~thrList[3]
    for i in range(0,4):
        thrList[i].start()
    # another way
    #for i in range(1,5):
    #    MyThread('ThreadFun', i).start()


另外一種作法是把 thread function 當參數,在建構物件時傳入,
那麼一樣在呼叫 start() 是,這個函式就會被執行。

threading.Condition

Condition 也是一種鎖,
也就是說它也提供 acquire/release 方法。
除此之外,
它還可以wait/notify/notifyAll,
但是要注意的一點是,
wait/notify/notifyAll 一定要跟 acquire/release 一起使用,
否則就會拋出 RuntimeError 異常。

我們可以用一個簡單的 productor/consumer 模型來當作範例。
#!/usr/bin/python
# -*- coding:utf-8 -*-

from threading import *
import time

class itemX:
    def __init__(self):
        self.cnt = 0

    def produce(self, num=1):
        self.cnt += 1

    def consume(self, num=1):
        if self.cnt:
            self.cnt -= 1
        else:
            print 'WARNING***********************WARNING'

    def isEmpty(self):
        return not self.cnt

    def getCount(self):
        return self.cnt

class Producer(Thread):
    def __init__(self, condition, item, sleeptime=2):
        Thread.__init__(self)
        self.con = condition
        self.item = item
        self.sleeptime = sleeptime

    def run(self):
        while (True):
            time.sleep(self.sleeptime)
            self.con.acquire()
            self.item.produce()
            print 'produce 1 product\r\n'
            print self.item.getCount()
            self.con.notifyAll()
            self.con.release()

class Consumer(Thread):
    def __init__(self, condition, item, sleeptime=2):
        Thread.__init__(self)
        self.con = condition
        self.item = item
        self.sleeptime = sleeptime
    def run(self):
        while (True):
            time.sleep(self.sleeptime)
            self.con.acquire()
            print '({0})enter'.format(self.getName())
            while self.item.isEmpty():
                print '({0})wait'.format(self.getName())
                self.con.wait()
            self.item.consume()
            print '({0})consume 1 product\r\n'.format(self.getName())
            print self.item.getCount()
            self.con.release()


if __name__ == "__main__":
    X = itemX()
    cond = Condition()
    Producer(cond, X).start()
    Consumer(cond, X).start()
    Consumer(cond, X).start()

    while (True):
        pass

threading.Event

event 是很常用的同步機制,
我們先用 event 改寫上一個例子,
再來討論它有什麼要注意的地方。

#!/usr/bin/python
# -*- coding:utf-8 -*-

from threading import *
import time

class itemX:
    def __init__(self):
        self.cnt = 0

    def produce(self, num=1):
        self.cnt += 1

    def consume(self, num=1):
        if self.cnt:
            self.cnt -= 1
        else:
            print 'WARNING***********************WARNING'

    def isEmpty(self):
        return not self.cnt

    def getCount(self):
        return self.cnt

class Producer(Thread):
    def __init__(self, condition, event, item, sleeptime=1):
        Thread.__init__(self)
        self.con = condition
        self.event = event
        self.item = item
        self.sleeptime = sleeptime

    def run(self):
        while (True):
            time.sleep(self.sleeptime)
            self.con.acquire()
            self.item.produce()
            print 'produce 1 product, remain({0})\r\n'.format(self.item.getCount())
            self.event.set()
            self.con.release()

class Consumer(Thread):
    def __init__(self, condition, event, item, sleeptime=1):
        Thread.__init__(self)
        self.con = condition
        self.event = event
        self.item = item
        self.sleeptime = sleeptime
    def run(self):
        while (True):
            time.sleep(self.sleeptime)
            self.con.acquire()
            print '({0})enter\r\n'.format(self.getName())
            #while self.item.isEmpty():
            while (True):
                print '({0})wait'.format(self.getName())
                self.event.wait()
                break
            self.item.consume()
            self.event.clear()
            print '({0})consume 1 product, remain({1})\r\n'.format(self.getName(), self.item.getCount())
            self.con.release()


if __name__ == "__main__":
    X = itemX()
    cond_Con = Condition()
    cond_Pro = Condition()
    event = Event()
    Producer(cond_Pro, event, X).start()
    Consumer(cond_Con, event, X).start()
    Consumer(cond_Con, event, X).start()

    while (True):
        pass


首先,
event 的 block 機制跟 condition 是很不一樣的,
對condition來說,notify 是一個通知,
有人在等,通知才有意義,
如果沒有人等,通知是沒有意義的。
notify 和 wait 是有序的,
就是說,假設通知先發生了,
這時候才有人開始等,
那麼是絕對等不到已經發生的那一個通知。

可是 event 不一樣,event 是一種狀態。
當 event 被設定了,
它就處於激發狀態,
只要狀態沒有改變,
任何時候都可以等得到。

為了強迫 consumer 一定要等到 event 才能 consume,
我們把上一個範例的 "while self.item.isEmpty():" 改成 "while (True):"
當然 Lock(Critical Section) 的保護也是必要的,
才不會發生多個 consumers 同時 consume 的情況(race condition)。

producer 的 Lock 在這個範例可以不要,
但如果有多個 producers 就一定要加,
特別要注意的一點是,
producer 用的 Lock 和 consumer 的必須要不一樣,
不然會出現 deadlock。

還有就是這邊的 event 是 manual reset,
也就是激發後必須手動呼叫 clear() 方法使其回到未激發狀態。

threading.Timer
Timer 也是 threading 的一個元件,
可以在指定的時間間隔後,執行某一個動作(函式),
例如:

#-*- coding: utf-8 -*-
#!/usr/bin/python

from threading import Time

def hello(msg):
    print msg

t = Timer(3, hello, ['Hello world'])
t.start()


Pooling Threads

最後想要討論一個有趣的東西。
不同的作業系統對可執行的 thread 限制都不一樣,
有時候我們擔心一次建立太多 thread 會造成系統(或程式)效能變差,
比如說我們只想建立兩個 threads,可是我們有十件工作要做,
那麼我們就可以用排程的概念來實做。

在程式的一開始先把兩個 thread 建好 (thread pool),
然後利用 python 的 Queue module,
把十件工作的資料都 put 進 Queue。
在 thread function 裡面會去 get Queue 的資料,
只要 get 到,thread 就會開始工作。

#!/usr/bin/python
#-*- coding:utf-8 -*-

from threading import *
import Queue
import time

class MyThread(Thread):
    def __init__(self, condition):
        Thread.__init__(self)
        self.cond = condition

    def run(self):
        print '{0} start\r\n'.format(self.getName())
        global cnt
        while (True):
            id = threadPool.get()
            if id != None:
            self.cond.acquire()
            print '{0}_{1}'.format(self.getName(), id)
            for x in xrange(101):
               cnt += x
               time.sleep(2)
            print 'cnt = {0}\r\n'.format(cnt)
            cnt = 0
            self.cond.release()
            threadPool.task_done()


threadPool = Queue.Queue(0)
condition = Condition()
cnt = 0
for i in xrange(2):
    MyThread(condition).start()

for i in xrange(10):
    threadPool.put(i)

threadPool.join()
print 'done'


在 thread function 最後呼叫 task_done() 是為了讓 Queue 知道這一個工作已經完成,
是給 Queue.join() 作為參考。

如果這些工作會存取到相同的資源,
還是記得要用 Lock 保護。

Queue module 是 thread safe,所以這樣的應用是沒有問題的。

沒有留言:

張貼留言