Pythonでマルチスレッド処理を行う際の生産者-消費者(Producer-Consumer)問題の実装を解説する

スポンサーリンク

この記事では、並行処理の古典的な問題であるProducer-Consumer問題について、Python Threadsを使用して解決する方法を学びます。

では、始めましょう。

こちらもお読みください。

スポンサーリンク

生産者・消費者問題とは何か?

生産者・消費者問題は、3つの要素から構成される。

1. バウンデッドバッファ

バッファは、異なるスレッドからアクセス可能な一時的なストレージです。

バッファの簡単な例としては、配列があります。

複数のスレッドが同時にバッファからデータを読み取ったり、バッファにデータを書き込むことができます。

バウンデッドバッファとは、容量が制限されており、その容量を超えてデータを保存することができないバッファのことです。

2. 2. プロデューサースレッド

Producer Thread とは、あるデータを生成してバッファに格納し、必要なデータがすべて生成されなくなるまで再び開始するスレッドです。

例えば、ネットワーク経由でデータをダウンロードし、それを一時的にバッファに格納するスレッドがこれにあたる。

3. 消費者スレッド

Consumer Thread は、バッファ内のデータを消費し、それを何らかのタスクに使用し、スレッドに割り当てられたタスクが完了しない限り再び開始するスレッドです。

例えば、インターネットからダウンロードしたデータを読み込んで、データベースに保存するスレッドがこれにあたります。

スレッドの稼働率が異なるとどうなるか?

スレッドが行う操作の速度は、割り当てられたタスクによって異なることがあります。

この場合、Producer スレッドが Consumer スレッドより遅いか、Producer スレッドが Consumer スレッドが消費する速度より速くデータを生成している可能性があります。

スレッドの動作速度が異なる場合、何らかの問題が発生する可能性があり、これがProducer-Consumer問題の言い分です。

  1. Producerスレッドがデータをバッファに生成しようとして、バッファが既に一杯になっていることが分かった場合、Producerスレッドはバッファ内にデータを追加することも、まだConsumerによって消費されていない既存のデータを上書きすることもできない。したがって,Producer Thread は,バッファからデータが消費されなくなるまで,自分自身を停止させる必要があります。このシナリオは、Producer Thread が高速であれば可能かもしれません。
    1. Consumer Threadがバッファからデータを消費しようとしているが、バッファが空であることがわかった場合、Consumer Threadはデータを受け取ることができないので、バッファにデータが追加されなくなるまで自分自身を停止させなければなりません。このシナリオは、Consumer Thread が高速であれば可能かもしれません。
    1. バッファは、バッファからデータに同時にアクセスできる異なるスレッド間で共有されているので、レースコンディションが発生する可能性があり、両方のスレッドが同時に共有バッファにアクセスするべきではありません。Producer スレッドがデータをバッファに追加して Consumer スレッドが待機するか、Producer スレッドが待機して Consumer スレッドが共有バッファで動作している間にデータを読み込むか、どちらかにすべきです。

セマフォを使った解決方法

この問題は、スレッド間の同期をとるためのツールであるセマフォを使って解決することができます。

Producer-Consumer問題の問題文で定義された3つの問題に取り組むために、3つのセマフォを維持します。

1.空。

このセマフォは、我々のバッファで空のスロットの数を格納します。

このセマフォの初期値は、私たちの境界付きバッファのサイズです。

もしこのセマフォの値がすでに 0 であれば、バッファが満杯であることを意味し、空のセマフォの値が 0 より大きくなるまで Producer スレッドをブロックします。

同様に、Consumer スレッドがバッファからデータを消費した後、このセマフォを解放してセマフォの値を 1 増加させます。

2. フル このセマフォは、バッファの中で満杯になっているスロットの数を格納します。

このセマフォの初期値は0です。

バッファからデータを消費する前に、Consumer Threadはこのセマフォを取得しようとします。

このセマフォの値がすでに0であれば、バッファはすでに空であることを意味し、フルセマフォの値が0より大きくなるまでConsumer Threadをブロックします。

同様に、Producer Threadはこのセマフォに1つの項目を追加した後、このセマフォを解放します。

3. ミューテックス このセマフォは、共有バッファに対して一度に1つのセマフォしか操作できないようにすることで、レースコンディションを処理します。

このセマフォの初期値は1です。

共有バッファに対して操作を行う前に、両方のスレッドがこのセマフォを獲得しようとします。

いずれかのスレッドがこのセマフォの値が0であることを発見した場合、これは他のスレッドがバッファ上で動作していることを意味し、セマフォによってブロックされます。

バッファを操作した後、作業中のスレッドはこのセマフォを解放し、他のスレッドがバッファを操作できるようにします。

また、スレッドがデータを追加したり取得したりするのに役立つように、2つのポインタを保持しています。

  • ポインタ。このポインタは Producer スレッドに、Producer が生成するバッファのどこに次のデータを追加するかを知らせます。追加された後,このポインタは 1 つインクリメントされます.
  • out ポインタ。このポインタは,Consumer Thread に,バッファから次のデータを読み込む場所を知らせます。読み込んだ後、このポインタは 1 つインクリメントされます。

Pythonによる生産者・消費者問題の実装

この問題をPythonでどのように解決するか、実装を確認してみましょう。

例えば、容量 10 のバッファがあるとします。

Producer Threadは20個のアイテムを生成し、Consumer ThreadはProducerが生成した20個のアイテムを消費します。

Producer に time.sleep(1) を、Consumer に time.sleep(2.5) を追加すると、Producer Thread は Consumer Thread よりも高速に動作するようになります。

Consumer Thread を先に起動しても、バッファにデータが無くなるまで待ちます。

import threading
import time
 
# Shared Memory variables
CAPACITY = 10
buffer = [-1 for i in range(CAPACITY)]
in_index = 0
out_index = 0
 
# Declaring Semaphores
mutex = threading.Semaphore()
empty = threading.Semaphore(CAPACITY)
full = threading.Semaphore(0)
 
# Producer Thread Class
class Producer(threading.Thread):
  def run(self):
     
    global CAPACITY, buffer, in_index, out_index
    global mutex, empty, full
     
    items_produced = 0
    counter = 0
     
    while items_produced < 20:
      empty.acquire()
      mutex.acquire()
       
      counter += 1
      buffer[in_index] = counter
      in_index = (in_index + 1)%CAPACITY
      print("Producer produced : ", counter)
       
      mutex.release()
      full.release()
       
      time.sleep(1)
       
      items_produced += 1
 
# Consumer Thread Class
class Consumer(threading.Thread):
  def run(self):
     
    global CAPACITY, buffer, in_index, out_index, counter
    global mutex, empty, full
     
    items_consumed = 0
     
    while items_consumed < 20:
      full.acquire()
      mutex.acquire()
       
      item = buffer[out_index]
      out_index = (out_index + 1)%CAPACITY
      print("Consumer consumed item : ", item)
       
      mutex.release()
      empty.release()     
       
      time.sleep(2.5)
       
      items_consumed += 1
 
# Creating Threads
producer = Producer()
consumer = Consumer()
 
# Starting Threads
consumer.start()
producer.start()
 
# Waiting for threads to complete
producer.join()
consumer.join()

結果は以下の通りです。

出力:

Producer produced :  1
Consumer consumed item :  1
Producer produced :  2
Producer produced :  3
Consumer consumed item :  2
Producer produced :  4
Producer produced :  5
Consumer consumed item :  3
Producer produced :  6
Producer produced :  7
Producer produced :  8
Consumer consumed item :  4
Producer produced :  9
Producer produced :  10
Consumer consumed item :  5
Producer produced :  11
Producer produced :  12
Producer produced :  13
Consumer consumed item :  6
Producer produced :  14
Producer produced :  15
Consumer consumed item :  7
Producer produced :  16
Producer produced :  17
Consumer consumed item :  8
Producer produced :  18
Consumer consumed item :  9
Producer produced :  19
Consumer consumed item :  10
Producer produced :  20
Consumer consumed item :  11
Consumer consumed item :  12
Consumer consumed item :  13
Consumer consumed item :  14
Consumer consumed item :  15
Consumer consumed item :  16
Consumer consumed item :  17
Consumer consumed item :  18
Consumer consumed item :  19
Consumer consumed item :  20

まとめ

これで、古典的なProducer-Consumer問題の解き方がわかったと思います。

複数のアプリケーションがドキュメントを印刷したい場合、ネットワーク経由でデータをダウンロードし、データベースに保存する場合など、同様の状況が起こりうる実生活の例はたくさんあります。

お読みいただきありがとうございました!

タイトルとURLをコピーしました