Pythonのマルチスレッドの同期方法|LockやRlockを使ったやり方を解説する

スポンサーリンク

Pythonの同期について説明します。

マルチスレッドにより、コンピュータはシステム上に存在する複数のコア/複数のCPUを利用して、並行してアクションを実行することができます。

しかし、共有変数の読み込みと更新を同時に行う場合、誤った結果を招く可能性があります。

ここでは、正しい結果を得るためにスレッドを同期させる方法を学びます。

スポンサーリンク

マルチスレッドにおけるレースコンディションの理解

2つ以上のスレッドが同時に共有リソースにアクセスしてデータを変更しようとすると、そのような変数の最終的な値は予測不可能になります。

これは、スレッドスケジューリングアルゴリズムがいつでもスレッド間を入れ替えることができ、どのスレッドが最初に実行されるか分からないからです。

このようなシナリオをレースコンディションと呼びます。

スレッドを使って、ある銀行口座から別の銀行口座にある金額を送金する例を見てみましょう。

口座1から口座2へ1単位を送金するために、100個のスレッドを作成します。

import threading
import time
 
class BankAccount():
  def __init__(self, name, balance):
    self.name = name
    self.balance = balance
 
  def __str__(self):
    return self.name
 
# These accounts are our shared resources
account1 = BankAccount("account1", 100)
account2 = BankAccount("account2", 0)
 
class BankTransferThread(threading.Thread):
  def __init__(self, sender, receiver, amount):
    threading.Thread.__init__(self)
    self.sender = sender
    self.receiver = receiver
    self.amount = amount
   
  def run(self):
    sender_initial_balance = self.sender.balance
    sender_initial_balance -= self.amount
    # Inserting delay to allow switch between threads
    time.sleep(0.001)
    self.sender.balance = sender_initial_balance
     
    receiver_initial_balance = self.receiver.balance
    receiver_initial_balance += self.amount
    # Inserting delay to allow switch between threads
    time.sleep(0.001)
    self.receiver.balance = receiver_initial_balance
 
if __name__ == "__main__":
   
  threads = []
 
  for i in range(100):
    threads.append(BankTransferThread(account1, account2, 1))
 
  for thread in threads:
    thread.start()
 
  for thread in threads:
    thread.join()
 
  print(account1.balance)
  print(account2.balance)
account1 98
account2 3

最初は、口座1が100口、口座2が0口です。

1ユニットを100回転送した後、account1は0ユニット、account2は100ユニットになるはずです。

ところが、違う結果が出てしまいました。

これを複数回実行すれば、異なる結果が得られるはずです。

Python での同期 – スレッドを同期させるさまざまなメソッド

競合状態を回避するために、スレッドを同期させる方法を見てみましょう。

1. ロックオブジェクト

Lockオブジェクトは最も基本的な同期プリミティブで、ロックされても特定のスレッドに所有されることはありません。

ロックオブジェクトは、どのスレッドがロックを許可しているかという情報を保持せず、どのスレッドもロックを解放することができます。

Lock オブジェクトは、”locked” と “unlocked” の2つの状態のうちどちらかになります。

Lockオブジェクトが生成された時点では、”unlocked “状態になっています。

ロックオブジェクトには3つのメソッドしかありません。

  • acquire()。このメソッドは Lock オブジェクトを “unlocked” 状態から “locked” 状態に変更し、呼び出したスレッドが実行を継続できるようにします。Lockオブジェクトがすでに「locked」状態である場合、呼び出したスレッドはロックが「unlocked」状態になるまでブロックされます。
  • release(): このメソッドは、Lock オブジェクトの状態を “locked” から “unlocked” に変更します。もし Lock オブジェクトがすでに “unlocked” 状態であれば、RuntimeError が発生します。このメソッドは、ロックを取得したスレッドだけでなく、どのスレッドからでも呼び出すことができます。
  • locked(): このメソッドは Lock オブジェクトを取得した場合に true を返します。

それでは、銀行振込の例でPythonでLockオブジェクトを使用して同期を追加する方法を見てみましょう。

import threading
import time
 
class BankAccount():
  def __init__(self, name, balance):
    self.name = name
    self.balance = balance
 
  def __str__(self):
    return self.name
 
# These accounts are our shared resources
account1 = BankAccount("account1", 100)
account2 = BankAccount("account2", 0)
 
# Creating lock for threads
lock = threading.Lock()
 
class BankTransferThread(threading.Thread):
  def __init__(self, sender, receiver, amount):
    threading.Thread.__init__(self)
    self.sender = sender
    self.receiver = receiver
    self.amount = amount
   
  def run(self):
    lock.acquire()
     
    sender_initial_balance = self.sender.balance
    sender_initial_balance -= self.amount
    # Inserting delay to allow switch between threads
    time.sleep(0.001)
    self.sender.balance = sender_initial_balance
     
    receiver_initial_balance = self.receiver.balance
    receiver_initial_balance += self.amount
    # Inserting delay to allow switch between threads
    time.sleep(0.001)
    self.receiver.balance = receiver_initial_balance
     
    lock.release()
     
 
if __name__ == "__main__":
   
  threads = []
 
  for i in range(100):
    threads.append(BankTransferThread(account1, account2, 1))
 
  for thread in threads:
    thread.start()
 
  for thread in threads:
    thread.join()
 
  print(account1.name, account1.balance)
  print(account2.name, account2.balance)
account1 0
account2 100

Lock オブジェクトはどのスレッドが acquire() メソッドを呼び出したかを知らず、どのスレッドも acquire() を呼び出したスレッドから許可を得ることができるロックに対して release() を呼び出すことができます。

また、同じスレッドが release() を行わずに再び acquire() メソッドを呼び出した場合、そのスレッドはデッドロック状態になります。

import threading
 
lock = threading.Lock()
 
def funcA():
  print("In A, acquiring lock")
  lock.acquire()
   
  print("In A, lock acquired")
   
  print("In A, lock acquiring again and entering into deadlock")
  lock.acquire()
   
  print("In A, releasing lock")
  lock.release()
   
  print("In A, lock released")
 
def funcB():
  print("In B, releasing lock acquired by A")
  lock.release()
   
  print("In B, lock released")
 
if __name__ == "__main__":
  thread1 = threading.Thread(target=funcA)
  thread2 = threading.Thread(target=funcB)
 
  thread1.start()
  thread2.start()
 
  thread1.join()
  thread2.join()
In A, acquiring lock
In A, lock acquired
In A, lock acquiring again and entering into deadlock
In B, releasing lock acquired by A
In B, lock released
In A, releasing lock
In A, lock released

2. RLockオブジェクト

リエントラントロック(RLock)も同期プリミティブの1つで、同じスレッドが複数回ロックを取得してもデッドロック状態にならない。

RLockオブジェクトは、どのスレッドがロックの許可を持っているかを知っており、同じスレッドはロックを解除することができます。

RLockオブジェクトは “locked “と “unlocked “の2つの状態のいずれかになります。

RLockオブジェクトが生成されたときには、”unlocked “状態になっています。

RLockオブジェクトには、2つのメソッドしかありません。

  • acquire()。このメソッドは Lock オブジェクトを “unlocked” 状態から “locked” 状態に変更し、呼び出したスレッドが実行を継続できるようにします。同じスレッドがこのメソッドを再び呼び出すと、再帰レベルが1つ上がります。ロックを完全に解放するには、同じスレッドが同じ回数だけ release() を呼び出す必要がある。他のスレッドが「ロック」された状態でこのメソッドを呼び出すと、そのスレッドはブロックされる。
  • release(): このメソッドはロックを解放し、再帰レベルを1つ下げる。デクリメント後、再帰レベルが0になった場合、ロック状態はアンロック状態に変更される。デクリメント後も再帰レベルが0でない場合、ロックは呼び出したスレッドによって所有され、”ロック “されたままです。RLock オブジェクトが既に「unlocked」状態である場合、RuntimeError が発生します。
import threading
 
lock = threading.RLock()
 
def funcA():
  print("In A, acquiring lock")
  lock.acquire()
   
  print("In A, lock acquired, recursion level = 1")
   
  print("In A, acquiring lock again")
  lock.acquire()
   
  print("In A, lock acquired again, recursion level = 2")
   
  print("In A, releasing lock")
  lock.release()
   
  print("In A, lock released, recursion level = 1")
   
 
def funcB():
  print("In B, trying to acquire lock, but A released only once, so entering in deadlock state")
  lock.acquire()
  print("This statement won't be executed")
   
if __name__ == "__main__":
  thread1 = threading.Thread(target=funcA)
  thread2 = threading.Thread(target=funcB)
 
  thread1.start()
  thread2.start()
 
  thread1.join()
  thread2.join()
In A, acquiring l
In A, lock acquired, recursion level = 1
In A, acquiring lock again
In A, lock acquired again, recursion level = 2
In A, releasing lock
In A, lock released, recursion level = 1
In B, trying to acquire lock, but A released only once, so entering in deadlock state

3.

おわりに

この記事では、Pythonのスレッドモジュールを使って、レースコンディションを回避するためのPythonの同期を学びました。

Pythonの同期を実現するために、Lock、RLock、Semaphoresを使用しました。

タイトルとURLをコピーしました