Trong bài viết này, tôi muốn giới thiệu với các bạn về các cơ chế, kỹ thuật đồng bộ trong lập trình đa luồng (multithreading). Các kỹ thuật được trình bày trong ngôn ngữ Python nhưng về nguyên lý đều có thể áp dụng cho các ngôn ngữ khác. Những từ khóa chính trong bài viết: multithreading
, Lock
, RLock
, Condition
, Event
, Queue
.
Giới thiệu
Trước hết ta cần phân biệt khái niệm luồng (thread) và tiến trình (process). Thế nào là luồng? Thế nào là tiến trình? và sự khác nhau giữa chúng là gì?
Hiểu đơn giản, Tiến trình (process) là một chương trình (program) đang được thực thi. Trong linux, ta có chương trình ls
nằm tại thư mục /usr/bin
, khi ta gõ ls
vào trong shell, hệ điều hành sẽ nạp file ls
vào bộ nhớ, cung cấp một số tài nguyên cần thiết và thực thi câu lệnh ls
. Khi đó ta có một tiến trình của chương trình ls
trong hệ thống. Một chương trình sẽ có thể có nhiều tiến trình, và các tiến trình này có các tài nguyên độc lập với nhau. Ta có thể chạy nhiều câu lệnh ls
đồng thời trong nhiều terminal. Tiến trình chứa đựng thông tin tài nguyên, trạng thái thực hiện của chương trình, bao gồm: định danh tiến trình (process ID), User ID, group ID, các thanh ghi, stack, heap, các file đang mở,...
Luồng (thread) là một khối các câu lệnh (instructions) độc lập trong một tiến trình và có thể được lập lịch bởi hệ điều hành. Hay nói một cách đơn giản, Thread là các hàm hay thủ tục chạy độc lập đối với chương trình chính. Hình bên dưới minh họa sự khác nhau giữa luồng và tiến trình.
nguồn: https://computing.llnl.gov/
Một tiến trình có thể có nhiều luồng và phải có ít nhất một luồng, ta gọi đó là luồng chính (ví dụng hàm main() trong ngôn ngữ C). Ngoài các tài nguyên riêng của mình (các biến cục bộ trong hàm), các luồng chia sẻ tài nguyên chung của tiến trình. Việc thay đổi tài nguyên chung (ví dụ, đóng file, gán giá trị mới cho biến) từ một thread sẽ được nhìn thấy bởi tất cả các thread khác.
Vì vậy, lập trình viên cần phải thực hiện đồng bộ việc truy cập tài nguyên chung giữa các luồng.
Tại sao cần phải đồng bộ
Trong lập trình đa luồng, các threads chia sẻ chung tài nguyên của tiến trình, vì vậy có những thời điểm nhiều luồng sẽ đồng thời thay đổi dữ liệu chung, dẫn đến dữ liệu chung sẽ bị corrupt, sẽ cho ra kết qủa không mong muốn.
Do đó, ta cần những cơ chể để đảm bảo rằng, tại một thời điểm chỉ có duy nhất một luồng được phép truy cập vào dữ liệu chung, nếu các luồng khác muốn truy cập vào đoạn dữ liệu này thì cần phải đợi cho thread trước đó hoàn thành công việc của mình. Ta gọi đấy là đồng bộ trong lập trình đa luồng.
Giả sử ta muốn xây dựng một ứng dụng web có lưu số lượng truy cập vào một biết toàn cục number_visited
. Mỗi lần có request gửi đến, server sẽ tạo ra một thread mới (trên thực tế, ta nên dùng Threads pool để quản lý các threads, tránh chi phí tạo một thread mới) để xử lý request đó. Giả sử có 2 request đồng thời gửi đến, lúc đấy sẽ có 2 thread song song là A
và B
, và giả dụ tại thời điểm đó, biến number_visited = 17
, sẽ có thể xuất hiện trường hợp sau:
Thread A | Thread B | number_visited |
---|---|---|
Read number_visited : 17 |
17 | |
Read number_visited : 17 |
17 | |
Increment number_visited : 18 |
17 | |
Increment number_visited : 18 |
17 | |
Write number_visited : 18 |
18 | |
Write number_visited : 18 |
18 |
Trong trường hợp trên, lẽ ra giá trị của number_visited
phải là 19 nhưng do 2 thread cùng truy cập vào biến chung nên dẫn đến dữ liệu bị sai.
Để giải quyết vấn đề này, ta cần phải đảm bảo rằng tại một thời điểm chỉ có một thread được quyền truy cập dữ liệu chung, điều này đảm bảo rằng kết quả của chương trình đa luồng phải chính xác như khi chương trình đó chạy tuần tự đơn luồng.
Lập trình đa luồng trong Python
Module threading của thư viện chuẩn Python cung cấp cho chúng ta các class và function để làm việc với thread, nó cũng cung cấp các cơ chế để đồng bộ luồng, bao gồm: Thread, Lock, RLock, Condition, Semaphore,Event,...
Note: Trong Cpython, Global Interpreter Lock - GIL giới hạn chỉ có một thread được chạy tại một thời điểm, nên về cơ bản Cpython sẽ không hoàn toàn hỗ trợ đa luồng, nhưng ta có thể sử dụng các Interpreter khác (Jython và IronPython) không sử dụng GIL vì vậy có thể chạy đa luồng.
Class Thread cung cấp cho chúng ta các phương thức cần thiết để làm việc với luồng, cụ thể ta có:
start()
: Method dùng để kích hoạt (chạy) một thread object, mặc định sẽ gọirun()
method.run()
: Đây là phương thức chính chúng ta cần cài đặt, mô tả các công việc mà luồng thực hiện. Mặc định phương thức này sẽ gọi hàm liên kết với đối số target lúc khởi tạo luồng.join()
: Khi được gọi, method này sẽ block thread gọi nó (calling thread) cho đến khi thread được gọi (called thread - tức là thread có method join() vừa gọi) kết thúc. Method này thường được dùng trong luồng chính để đợi các thread khác kết thúc công việc của mình và xử lý tiếp kết qủa.- Ngoài ra class Thread còn cung cấp thêm một số thuộc tính và phương thức nữa như là:
name
,getName()
,setName()
,... Bạn có thể tham khảo thêm tại [1]
Để tạo một luồng mới với class Thread, ta có 2 cách:
- Truyền hàm cần thực hiện thông qua tham số
target
lúc khởi tạo Thread object. - Kế thừa class Thread và cài đặt method
run()
Đoạn code dưới đây minh họa 2 cách tạo create như trên:
import threading
import math def target_func(data): thread_id = threading.get_ident() print('Thread {} is running with data: {}'.format(thread_id, data)) class WorkerThread(threading.Thread): def __init__(self, data): super().__init__() self.data = data # Initiliaze data for thread def run(self): # This method is invoked when starting a thread # Do the work of thread here. print('Thread {} is running with data: {}'.format(self.ident, self.data)) if __name__ == '__main__': a = 'goodkat' b = 'godfather' # Create thread by passing target_func to target param thread1 = threading.Thread(target=target_func, args=(a,)) # Or by using CheckPrimeThread thread2 = WorkerThread(b) # Start threads thread1.start() thread2.start() # Wait for thread1, thread2 to terminate thread1.join() thread2.join() print('Main thread exited') # Output:
# Thread 140280251209472 is running with data: goodkat
# Thread 140280242816768 is running with data: godfather
# Main thread exited
Các cơ chế đồng bộ trong Python
Trong phần này, tôi sẽ trình bày về các cơ chế đồng bộ trong lập trình đa luồng thông qua bài toán đếm số nguyên tố: Cho số nguyên dương N, liệt kê ra các số nguyên tố trong đoạn từ 2 đến N.
Nếu giải bài toán này theo cách tuần tự (đơn luồng) thông thường thật đơn giản. Chỉ cần cài đặt hàm kiểm tra một số đầu vào có là số nguyên tố hay không, sau đó kiểm tra từng số trong đoạn từ 2 đến N, nếu là số nguyên tố thì in ra màn hình.
Trong lập trình đa luồng, có 3 yếu tố ta cần phải quan tâm đó là:
- Chia nhỏ bài toán: Chia nhỏ khối lượng công việc cần tính toán và giao cho mỗi luồng thực hiện riêng một phần công việc đó.
- Cân bằng tải giữa các luồng: Ta cần đảm bảo khối lượng tính toán mà mỗi luồng phải thực hiện càng bằng nhau càng tốt.
- Đồng bộ giữa các luồng khi các luồng chia sẻ tài nguyên với nhau: Ta cần đảm bảo tính toàn vẹn của dữ liệu khi có nhiều luồng đồng thời truy cập vào dữ liệu chung.
Chia nhỏ bài toán: Trong bài toán trên, ta có 2 cách để chia nhỏ bài toán:
- Chia dãy số từ 2 -> N thành các đoạn, nếu chương trình có K luồng, thì ta sẽ có K đoạn (có số lượng phần tử bằng nhau), và mỗi luồng tính toán trong từng đoạn riêng biệt.
- Lưu các số 2 -> N vào một mảng chung, nếu một luồng nào đấy rảnh sẽ lấy ra một số trong mảng đấy để kiểm tra, nếu là số nguyên tố thì thêm nó vào một mảng chứa các số nguyên tố (mảng này cũng được chia sẻ giữa các luồng).
Về cân bằng tải giữa luồng: Ta dễ thấy rằng nếu số càng lớn thì khối lượng tính toán để kiểm tra xem số đó có là số nguyên tố hay không càng lớn, vì vậy với cách chia thứ nhất, những luồng xử lý trên dãy các số nguyên bé sẽ hoàn thành công việc sớm hơn, và những luồng về sau sẽ càng mất nhiều thời gian để tính toán hơn, cho dù số phần tử của mỗi đoạn là bằng nhau. Do đó, ta thấy rằng cách chia thứ nhất cân bằng tải giữa các luồng không được tốt.
Với cách chia thứ hai, các luồng sẽ luân phiên nhau lấy ra một gía trị trong mảng chung, kiểm tra tính nguyên tố của gía trị đó một cách độc lập, sau khi hoàn thành công việc sẽ lấy một gía trị khác để kiểm tra, ta thấy rằng công việc được phân bố đều giữa các luồng, vì vậy cách chia thứ hai cho ta cân bằng tải tốt hơn.
Về đồng bộ luồng: Ở đây ta xét cách chia thứ hai, các luồng sẽ chia sẻ 2 tài nguyên chung đó là mảng các số 2 -> N và mảng chứa các số nguyên tố. Ta cần đảm bảo tại mỗi thời điểm chỉ có duy nhất một luồng được truy cập tài nguyên chung, nếu luồng khác muốn truy cập phải đợi (block) cho đến khi luồng trước đó cập nhật xong. Python cung cấp cho chúng ta các class cơ bản để thực hiện đồng bộ luồng bao gồm: Lock
, RLock
, Condition
, Event
, Semaphore
,...
Lock
Lock là cơ chế đồng bộ cơ bản nhất của Python. Một Lock gồm có 2 trạng thái, locked và unlocked, cùng với 2 phương thức để thao tác với Lock là acquire()
và release()
. Các quy luật trên Lock là:
- Nếu trạng thái (state) là unlocked, gọi acquire() sẽ thay đổi trạng thái sang locked
- Nếu trạng thái là locked, tiến trình gọi acquire() sẽ phải đợi (block) cho đến khi tiến trình khác gọi method release().
- Nếu trạng thái là unlocked, gọi method release() sẽ phát ra RuntimeError exception.
- Nếu trạng thái là locked, gọi method release() sẽ chuyển trạng thái của Lock sang unlocked.
Tại mỗi thời điểm, chỉ có nhiều nhất một thread sở hữu Lock. Với cơ chế Lock, ta có thể đồng bộ bài toán đếm số nguyên tố như sau:
import math
import threading def is_prime(n): """Check if n is prime or not """ root = int(math.sqrt(n)) for i in range(2, root + 1): if n % i == 0: return False return True class CountPrime(threading.Thread): def __init__(self, list_num, list_lock, out_lock, output): threading.Thread.__init__(self) self._list = list_num # list of number from 2 to N self._list_lock = list_lock # Lock for list_num self._out_lock = out_lock # Lock for output self._output = output # list of prime numbers def run(self): while True: # request to access shared resource # if there are many threads acquiring Lock, only one thread get the Lock # and other threads will get blocked self._list_lock.acquire() try: n = next(self._list) # pop a number in list_num except StopIteration: return finally: # release the Lock, so other thread can gain the Lock to access list_num self._list_lock.release() if not is_prime(n): continue self._out_lock.acquire() self._output.append(n) self._out_lock.release() def parallel_primes(n, num_threads=None): """Parallel count number of prime from 2 to n Count prime numbers from 2 to n using num_threads threads If num_threads is None, using os.cpu_count() """ list_num = (i for i in range(2, n + 1)) list_lock = threading.Lock() out_lock = threading.Lock() output = [] threads = [] if num_threads is None: try: num_threads = os.cpu_count() except AttributeError: num_threads = 4 elif num_threads < 1: raise ValueError('num_threads must be > 0') for i in range(num_threads): thread = CountPrime(list_num, list_lock, out_lock, output) threads.append(thread) thread.start() for thread in threads: thread.join() return output
Rlock (Reentrant Lock)
Tương tự như Lock và sử dụng khái niệm "owning thread" (tiến trình sở hữu) và "recursion level" (có thể gọi method acquire() nhiều lần).
- Owning thread: Rlock object lưu giữ định danh (ID) của thread sở hữu nó. Do đó, khi một thread sở hữu Rlock, thread đó có thể gọi method acquire() nhiều lần mà không bị block (Khác với Lock, khi một thread đã dành được khóa, các tiến trình gọi acquire() sẽ bị block cho đến khi khóa được release). Hơn nữa, đối với Rlock, khóa chỉ được unlocked khi thread sở hữu gọi method release() (các lời gọi acquire() và release() có thể lồng nhau) và số lần gọi release() phải bằng với số lần gọi method acquire() trước đó (khác với Lock, khóa có thể được unlocked khi một thread bất kỳ gọi method release()).
- recursion level: Một khi thread dành được khóa, thread đó có thể gọi acquire() nhiều lần mà không bị block.
Bởi vì đặc điểm trên, Rlock có thể hữu ích cho các hàm đệ quy:
import threading lock = threading.Rlock() def do_recursion_job(...): # If we use Lock() instead of RLock(), # this function will be blocked in the second call. lock.acquire() do_recursion_job(...) lock.release()
Hoặc trong những trường hợp ta muốn duy trình mối quan hệ sở hữu, ta muốn lock chỉ được giải phóng (release) với thread sở hữu nó.
Condition
Đây là cơ chế đồng bộ được sử dụng khi một thread muốn đợi (block) một điều kiện nào đấy xảy ra và một thread khác sẽ thông báo (notify) khi điều kiện đã xảy ra, lúc đấy thread đang đợi sẽ được đánh thức và dành được khóa để truy cập độc quyền vào tài nguyên chung (không có thread nào khác được phép truy cập vào tài nguyên khi thread này chưa realse khóa).
Biến điều kiện luôn liên kết với một khóa bên dưới (Lock hoặc RLock). Biến điều kiện cung cấp các method chính như sau:
acquire(*args)
: Method này đơn giản chỉ trả về method acquire() của khóa bên dưới.release()
: Tương tự acquire(), gọi mthod release() của khóa bên dưới.wait(timeout=None)
: Đợi cho đến khi được thông báo (notify) hoặc timeout. Nếu thread gọi method này khi chưa dành đươc khóa thì một RuntimeError sẽ được ném ra. Phương thức này giải phóng khóa bên dưới và block cho đến khi một thread khác gọi notify() hay notify_all() trên cùng biến điều kiện, hoặc timeout xảy ra. Một khi được thức giấc, thread dành lại khóa của mình và tiếp tục thực hiện công việc.notify(n=1)
: Method dùng để đánh thức (notify) nhiều nhất là n thread đang đợi điều kiện này xảy ra. RuntimeError exception sẽ được ném ra nếu thread gọi hàm này vẫn chưa dành (acquire) được khóa bên dưới. Lưu ý rằng method notify() không giải phóng khóa bên dưới, do đó thread gọi notify() cần phải gọi release() method tường minh để các thread đang đợi điều kiện (wait()) có thể dành được khóa.notify_all()
: Đánh thức tất cả các thread đang đợi điều kiện này. Method này chỉ đơn giản gọi notify() với đối số n là tất cả các thread.
Một ví dụ minh họa cực hay cho cơ chế đồng bộ này đó là bài toán producer/cosumer (sản-xuât/tiêu-thụ). Thread producer thêm (sản-xuất) một số nguyên ngẫu nhiên tới một mảng chung tại những thời điểm ngẫu nhiên và thread consumber lấy (tiêu-thụ) ra những số nguyên từ mảng chung đó.
Trong đoạn code bên dưới, Producer object dành (acquire) được khóa, thêm một số nguyên vào mảng, thông báo (notify) cho Consumber thread rằng đã có dữ liệu (integer) để sử dụng và giải phóng (release) khóa.
class Producer(threading.Thread): """ Producers random integers to a list """ def __init__(self, integers, condition): threading.thread.__init__(self) self.integers = integers self.condition = condition def run(self): """ Append random integers to integers list at random time. """ while True: integer = random.randint(0, 256) self.condition.acquire() print('condition acquired by {}'.format(self.name)) self.integers.append(integer) print('{} appended to list by {}'.format(integer, self.name)) print('condition notified by {}'.self.name) self.condition.notify() print('condition released by {}'.format(self.name)) self.condition.release() time.sleep(1)
Tiếp đến là Consumer object, nó dành được khóa, kiểm tra xem có phần tử nào trong mảng không. Nếu mảng rỗng, nó đợi (wait) cho đến khi được thông báo (notify) bởi producer. Một khi phần tử được lấy về từ mảng chung, consumer giải phóng khóa.
Lưu ý rằng lời gọi wait() giải phóng khóa vì vậy producer có thể dành được tài nguyên và thực hiện công việc của mình.
class Consumer(threading.Thread): """ consumes random integers for a list """ def __init__(self, integers, condition): threading.Thread.__init__(self) self.integers = integers self.condition = condition def run(self): """ Consumes integers from shared list """ while True: self.condition.acquire() print('condition acquired by {}'.format(self.name)) while True: if self.integers: integer = self.integers.pop() print('{} popped from list by {}'.format(integer, self.name)) break print('conditon wait by {}'.format(self.name)) self.condition.wait() print('condition released by {}'.format(self.name)) self.condition.release()
Đoạn code dưới đây ta sẽ cài đặt hàm main, tạo ra 2 thread consumer và producer (file condition.py):
def main(): integers = [] condition = threading.Condition() producer = Producer(integers, condition) consumer = Consumer(integers, condition) producer.start() consumer.start() producer.join() consumer.join() if __name__ == '__main__': main()
Đầu ra khi chạy chương trình:
$ python3 condition.py
$ python condition.py
condition acquired by Thread-1
159 appended to list by Thread-1
condition notified by Thread-1
condition released by Thread-1
condition acquired by Thread-2
159 popped from list by Thread-2
condition released by Thread-2
condition acquired by Thread-2
condition wait by Thread-2
condition acquired by Thread-1
116 appended to list by Thread-1
condition notified by Thread-1
condition released by Thread-1
116 popped from list by Thread-2
condition released by Thread-2
condition acquired by Thread-2
condition wait by Thread-2
Semaphore
Đây là một trong những cơ chế đồng bộ lâu đời nhất trong lịch sử khoa học máy tính, được phát minh bởi nhà khoa học máy tính Edsger W. Dijkstra (trong đó ông sử dụng P() và V() thay vì acquire() và release()).
Một Semaphore duy trì một biến đếm (không âm) được truyền vào khi khởi tạo một Semaphore object, có giá trị giảm sau mỗi lần gọi acquire() và tăng sau mõi lần gọi method release(). Khi gọi method acquire() trên một Semaphore object với giá trị biến đếm bằng 0, nó sẽ block thread gọi và đợi cho đến khi thread khác gọi release() (làm tăng giá trị biến đếm lên 1).
Semaphore cũng cung cấp 2 method là acquire()
và release()
tương tự như cơ chế Lock hay RLock, chỉ khác ở chỗ method acquire() sẽ trả về ngay tức thì khi biến đếm có giá trị lớn hơn không.
Với cơ chế như trên, Semaphore thường được dùng để giới hạn số lượng thread đồng thời truy cập vào tài nguyên chúng, ví dụ, giới hạn số lượng kết nối tới một database server. Dễ nhận thấy rằng Lock là một trường hợp riêng của Semaphore trong đó biến đếm được khởi tạo bằng 1.
max_connections = 1000 class WorkerThread(threading.Thread): def __init__(self, pool_sema): threading.Thread.__init__(self) self.pool_sema = pool_sema def run(self): self.pool_sema.acquire() conn = connect_db() try: # do actions on the connection #.... finally: conn.close() self.pool_sema.release() def main(): # There are atmost max_connections threads can connect to database server. pool_sema = threading.Semaphore(value=max_connections) for i in range(1, 3000): thr = WorkerThread(pool_sema) thr.start()
Event
Event là cơ chế đồng bộ đơn giản trong đó một thread sẽ phát ra một sự kiện (event) và các thread khác đợi sự kiện đó.
Một Event object quản lý một cờ trong (internal flag), được thiết lập True với method set()
và thiết lập False với clear()
. Tiến trình gọi method wait()
bị block cho đến khi cờ này có giá trị True.
Quay trở lại với bài toán producer/consumer ở trên, ta sẽ cài đặt nó sử dụng Event thay vì Condition. Mỗi lần một số nguyên được thêm vào mảng chung, sự kiện sẽ được thiết lập (set) và cờ sẽ được clear ngay sau đó để thông báo tới consumer. Lưu ý rằng cờ được clear (False) khi khởi tạo Event object.
class Producer(threading.Thread): """ Procudes random integers to a list """ def __init__(self, integers, event): threading.Thread.__init__(self) self.integers = integers self.event = event def run(self): """ Append random integers to the shared list at random time. """ while True: integer = random.randint(0, 256) self.integers.append(integer) print('{} appended to list by {}'.format(integer, self.name)) print('event set by {}'.format(self.name)) self.event.set() self.event.clear() print('event cleared by {}'.format(self.name)) time.sleep(1)
Tiếp đến là Consumer class, ta cũng truyền một Event object cho hàm khởi tạo. Thread consumer sẽ bị block (khi gọi method wait()) cho đến khi sự kiện được phát ra, cho thấy rằng mảng chung đã có dữ liệu.
class Consumer(threading.Thread): """ Consumes random integers from a list """ def __init__(self, integers, event): threading.Thread.__init__(self) self.integers = integers self.event = event def run(self): """ Consumes integers from the list """ while True: self.event.wait() try: integer = self.integers.pop() print('{} popped from list by {}'.format(integer, self.name)) except IndexError: time.sleep(1)
Dưới đây ta sẽ cài đặt hàm main để thử nghiệm (file event.py):
def main(): integers = [] event = threading.Event() producer = Producer(integers, event) consumer = Producer(integers, event) producer.start() consumer.start() producer.join() consumer.join() if __name__ == '__main__': main()
Đầu ra của đoạn chương trình:
$ python3 event.py
124 appended to list by Thread-1
event set by Thread-1
event cleared by Thread-1
124 popped from list by Thread-2
223 appended to list by Thread-1
event set by Thread-1
event cleared by Thread-1
223 popped from list by Thread-2
Kết luận
Phuuu, bạn vẫn đọc đến đây đấy chứ. Hy vọng bài viết hữu ích cho những ai đang tìm hiểu các cơ chế đồng bộ khi lập trình đa luồng trong Python. Bài viết rất mong nhận được sự góp ý cũng như trao đổi từ các bạn. Thank you!