生產者消費者問題

生產者消費者問題(英語:Producer-consumer problem),也稱有限緩衝問題Bounded-buffer problem),是一個多進程同步問題的經典案例。該問題描述了共享固定大小緩衝區的兩個進程——即所謂的「生產者」和「消費者」——在實際運行時會發生的問題。生產者的主要作用是生成一定量的數據放到緩衝區中,然後重複此過程。與此同時,消費者也在緩衝區消耗這些數據。該問題的關鍵就是要保證生產者不會在緩衝區滿時加入數據,消費者也不會在緩衝區中空時消耗數據。

要解決該問題,就必須讓生產者在緩衝區滿時休眠(要麼乾脆就放棄數據),等到下次消費者消耗緩衝區中的數據的時候,生產者才能被喚醒,開始往緩衝區添加數據。同樣,也可以讓消費者在緩衝區空時進入休眠,等到生產者往緩衝區添加數據之後,再喚醒消費者。通常採用進程間通信的方法解決該問題,常用的方法有信號燈法[1]等。如果解決方法不夠完善,則容易出現死鎖的情況。出現死鎖時,兩個線程都會陷入休眠,等待對方喚醒自己。該問題也能被推廣到多個生產者和消費者的情形。

實現

編輯

不完善的實現

編輯

下面這個解決方法會導致競爭條件。如果程序員不夠小心,那麼他就有可能寫出下面這種算法。該算法使用了兩個系統庫函數,sleep 和 wakeup。調用 sleep 的進程會被阻斷,直到有另一個進程用 wakeup 喚醒之。代碼中的 itemCount 用於記錄緩衝區中的數據項數。

int itemCount = 0;

procedure producer() {
    while (true) {
        item = produceItem();
        if (itemCount == BUFFER_SIZE) {
            sleep();
        }
        putItemIntoBuffer(item);
        itemCount = itemCount + 1;
        if (itemCount == 1) {
            wakeup(consumer);
        }
    }
}

procedure consumer() {
    while (true) {
        if (itemCount == 0) {
            sleep();
        }
        item = removeItemFromBuffer();
        itemCount = itemCount - 1;
        if (itemCount == BUFFER_SIZE - 1) {
            wakeup(producer);
        }
        consumeItem(item);
    }
}

上面代碼中的問題在於它可能導致競爭條件,進而引發死鎖。考慮下面的情形:

  1. 消費者把最後一個 itemCount 的內容讀出來,注意它現在是零。消費者返回到while的起始處,現在進入 if 塊;
  2. 就在調用sleep之前,CPU決定將時間讓給生產者,於是消費者在執行 sleep 之前就被中斷了,生產者開始執行;
  3. 生產者生產出一項數據後將其放入緩衝區,然後在 itemCount 上加 1;
  4. 由於緩衝區在上一步加 1 之前為空,生產者嘗試喚醒消費者;
  5. 遺憾的是,消費者並沒有在休眠,喚醒指令不起作用。當消費者恢復執行的時候,執行 sleep,一覺不醒。出現這種情況的原因在於,消費者只能被生產者在 itemCount 為 1 的情況下喚醒;
  6. 生產者不停地循環執行,直到緩衝區滿,隨後進入休眠。

由於兩個進程都進入了永遠的休眠,死鎖情況出現了。因此,該算法是不完善的。

使用信號燈的算法

編輯

信號燈可以避免上述喚醒指令不起作用的情況。該方法(見下面的代碼)使用了兩個信號燈,fillCount 和 emptyCount。fillCount 用於記錄緩衝區中將被讀取的數據項數(實際上就是有多少數據項在緩衝區里),emptyCount 用於記錄緩衝區中空閒空間數。當有新數據項被放入緩衝區時,fillCount 增加,emptyCount 減少。如果在生產者嘗試減少 emptyCount 的時候發現其值為零,那麼生產者就進入休眠。等到有數據項被消耗,emptyCount 增加的時候,生產者才被喚醒。消費者的行為類似。

semaphore fillCount = 0; // 生产的项目
semaphore emptyCount = BUFFER_SIZE; // 剩余空间

procedure producer() {
    while (true) {
        item = produceItem();
        down(emptyCount);
            putItemIntoBuffer(item);
        up(fillCount);
    }
}

procedure consumer() {
    while (true) {
        down(fillCount);
            item = removeItemFromBuffer();
        up(emptyCount);
        consumeItem(item);
    }
}

上述方法在只有一個生產者和一個消費者時能解決問題。對於多個生產者或者多個消費者共享緩衝區的情況,該算法也會導致競爭條件,出現兩個或以上的進程同時讀或寫同一個緩衝區槽的情況。為了說明這種情況是如何發生的,可以假設 putItemIntoBuffer() 的一種可能的實現:先尋找下一個可用空槽,然後寫入數據項。下列情形是可能出現的:

  1. 兩個生產者都減少 emptyCount 的值;
  2. 某一生產者尋找到下一個可用空槽;
  3. 另一生產者也找到了下一個可用空槽,結果和上一步被找到的是同一個空槽;
  4. 兩個生產者向可用空槽寫入數據。

為了解決這個問題,需要在保證同一時刻只有一個生產者能夠執行 putItemIntoBuffer()。也就是說,需要尋找一種方法來互斥地執行臨界區的代碼。為了達到這個目的,可引入一個二值信號燈 mutex,其值只能為 1 或者 0。如果把線程放入 down(mutex) 和 up(mutex) 之間,就可以限制只有一個線程能被執行。多生產者、消費者的解決算法如下:

semaphore mutex = 1;
semaphore fillCount = 0;
semaphore emptyCount = BUFFER_SIZE;

procedure producer() {
    while (true) {
        item = produceItem();
        down(emptyCount);
            down(mutex);
                putItemIntoBuffer(item);
            up(mutex);
        up(fillCount);
    }
}
procedure consumer() {
    while (true) {
        down(fillCount);
            down(mutex);
                item = removeItemFromBuffer();
            up(mutex);
        up(emptyCount);
        consumeItem(item);
    }
}

使用管程的算法

編輯

下列偽代碼展示的是使用管程來解決生產者消費者問題的辦法。由於管程一定能保證互斥,不必特地考慮保護臨界區[2]。也就是說,下面這個方法不用修改就可以推廣適用於任意數量的生產者和消費者的情況。

monitor ProducerConsumer {
    int itemCount;
    condition full;
    condition empty;

    procedure add(item) {
        while (itemCount == BUFFER_SIZE)
            wait(full);
        putItemIntoBuffer(item);
        itemCount = itemCount + 1;
        if (itemCount == 1)
            notify(empty);
    }

    procedure remove() {
        while (itemCount == 0)
            wait(empty);
        item = removeItemFromBuffer();
        itemCount = itemCount - 1;
        if (itemCount == BUFFER_SIZE - 1)
            notify(full);
        return item;
    }
}

procedure producer() {
    while (true) {
        item = produceItem()
               ProducerConsumer.add(item)
    }
}

procedure consumer() {
    while (true) {
        item = ProducerConsumer.remove()
               consumeItem(item)
    }
}

注意代碼中 while 語句的用法,都是用在測試緩衝區是否已滿或空的時候。當存在多個消費者時,有可能造成競爭條件的情況是:某一消費者在一項數據被放入緩衝區中時被喚醒,但是另一消費者已經在管程上等待了一段時間並移除了這項數據。如果 while 語句被改成 if,則會出現放入緩衝區的數據項過多,或移除空緩衝區中的元素的情況。

不使用信號燈或者管程

編輯

對於生產者消費者問題來說,特別是當只有一個生產者和一個消費者時,實現一個先進先出結構或者通信通道非常重要。這樣,生產者-消費者模式就可以在不依賴信號燈、互斥變量或管程的的情況下高效地傳輸數據。但如果採用這種模式,性能可能下降,因為實現這種模式的代價比較高。人們喜歡用先進先出結構或者通信通道,只是因為可以避免端與端之間的原子性同步。用 C 語言舉例如下,請注意:

  1. 該例繞開了對共享變量的原子性「讀-改-寫」訪問:每個 Count 變量都由單進程更新;
  2. 該例並不使進程休眠,這種做法依據系統不同是合理的。方法 sched_yield()只是為了看起來舒服點。完全可以去掉(注意:它後面的分號是不能去掉的)。 進程庫通常會要求信號燈或者條件變量控制進程的休眠和喚起,在多處理器環境中,進程的休眠和喚起發生的頻率比傳遞數據符號要小,因此避開對數據原子性操作是有利的。
volatile unsigned int produceCount, consumeCount;
TokenType buffer[BUFFER_SIZE];

void producer(void) {
    while (1) {
        while (produceCount - consumeCount == BUFFER_SIZE)
            sched_yield(); // 缓冲区满
        buffer[produceCount % BUFFER_SIZE] = produceToken();
        produceCount += 1;
    }
}

void consumer(void) {
    while (1) {
        while (produceCount - consumeCount == 0)
            sched_yield(); // 缓冲区空
        consumeToken( buffer[consumeCount % BUFFER_SIZE]);
        consumeCount += 1;
    }
}

Java 中的例子

編輯
import java.util.Stack;
import java.util.concurrent.atomic.AtomicInteger;

/**
    1个生产者 3个消费者 生产、消费10次

    @作者 pt

*/

public class ProducerConsumer {
    Stack<Integer> items = new Stack<Integer>();
    final static int NO_ITEMS = 10;

    public static void main(String args[]) {
        ProducerConsumer pc = new ProducerConsumer();
        Thread t1 = new Thread(pc.new Producer());
        Consumer consumer  = pc.new Consumer();
        Thread t2 = new Thread(consumer);
        Thread t3 = new Thread(consumer);
        Thread t4 = new Thread(consumer);
        t1.start();
        try {
            Thread.sleep(100);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        t2.start();
        t3.start();
        t4.start();
        try {
            t2.join();
            t3.join();
            t4.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    class Producer implements Runnable {
        public void produce(int i) {
            System.out.println("Producing " + i);
            items.push(new Integer(i));
        }

        @Override
        public void run() {
            int i = 0;
            // 生产10次
            while (i++ < NO_ITEMS) {
                synchronized (items) {
                    produce(i);
                    items.notifyAll();
                }
                try {
                    // 休眠一段时间
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                }
            }
        }
    }

    class Consumer implements Runnable {
        // consumed计数器允许线程停止
        AtomicInteger consumed = new AtomicInteger();

        public void consume() {
            if (!items.isEmpty()) {
                System.out.println("Consuming " + items.pop());
                consumed.incrementAndGet();
            }
        }

        private boolean theEnd() {
            return consumed.get() >= NO_ITEMS;
        }

        @Override
        public void run() {
            while (!theEnd()) {
                synchronized (items) {
                    while (items.isEmpty() && (!theEnd())) {
                        try {
                            items.wait(10);
                        } catch (InterruptedException e) {
                            Thread.interrupted();
                        }
                    }
                    consume();
                }
            }
        }
    }
}

相關條目

編輯

參考資料

編輯
  1. ^ Modern Operating Systems (Third Edition),Andrew S. Tanenbaum,2011. Prentice Hall
  2. ^ Operating System Concepts (Seventh Edition),Abraham Silberschatz,2010. Wiley