Ich habe einen Producer und einen Consumer -Thread (threading.Thread), die einen queue vom Typ Queue gemeinsam nutzen.

Produzent run:

while self.running:
    product = produced() ### I/O operations
    queue.put(product)

Verbraucher run:

while self.running or not queue.empty():
    product = queue.get()
    time.sleep(several_seconds) ###
    consume(product)

Jetzt muss ich beide Threads vom Hauptthread aus beenden, mit der Anforderung, dass queue vor dem Beenden leer sein muss (alle verbraucht).

Derzeit verwende ich den folgenden Code, um diese beiden Threads zu beenden:

Hauptthread stop:

producer.running = False
producer.join()
consumer.running = False
consumer.join()

Aber ich denke, es ist unsicher, wenn es mehr Verbraucher gibt.

Außerdem bin ich mir nicht sicher, ob der sleep dem Hersteller einen Zeitplan zur Verfügung stellt, damit er mehr Produkte produzieren kann. Tatsächlich finde ich, dass der Produzent immer "hungert", aber ich bin mir nicht sicher, ob dies die Hauptursache ist.

Gibt es eine anständige Möglichkeit, mit diesem Fall umzugehen?

3
Hongxu Chen 9 Aug. 2015 im 17:27

3 Antworten

Beste Antwort

Bearbeiten 2:

A) Der Grund, warum sich Ihre Verbraucher immer wieder so viel Zeit nehmen, ist, dass Ihre Schleife auch dann kontinuierlich läuft, wenn Sie keine Daten haben.

B) Ich habe unten Code hinzugefügt, der zeigt, wie man damit umgeht.

Wenn ich Sie richtig verstanden habe, ist der Produzent / Verbraucher ein kontinuierlicher Prozess, z. Es ist akzeptabel, das Herunterfahren zu verzögern, bis Sie die aktuelle blockierende E / A verlassen und die von Ihnen empfangenen Daten verarbeiten.

In diesem Fall würde ich, um Ihren Produzenten und Konsumenten ordnungsgemäß herunterzufahren, die Kommunikation vom Haupt-Thread zum Produzenten-Thread hinzufügen, um ein Herunterfahren aufzurufen. Im allgemeinsten Fall kann dies eine Warteschlange sein, mit der der Hauptthread einen "Shutdown" -Code in die Warteschlange stellen kann. Im einfachen Fall eines einzelnen Produzenten, der gestoppt und nie neu gestartet werden soll, kann es sich jedoch einfach um ein globales Herunterfahren handeln Flagge.

Ihr Produzent sollte diese Abschaltbedingung (Warteschlange oder Flag) in seiner Hauptschleife überprüfen, bevor er eine blockierende E / A-Operation startet (z. B. nachdem Sie andere Daten an die Consumer-Warteschlange gesendet haben). Wenn das Flag gesetzt ist, sollte ein spezieller Datenendcode (der nicht wie Ihre normalen Daten aussieht) in die Warteschlange gestellt werden, um dem Verbraucher mitzuteilen, dass ein Herunterfahren stattfindet, und der Produzent sollte dann zurückkehren (beenden) selbst).

Der Verbraucher sollte geändert werden, um nach diesem Datenende-Code zu suchen, wenn er Daten aus der Warteschlange zieht. Wenn der Datenende-Code gefunden wird, sollte er ordnungsgemäß heruntergefahren und zurückgegeben werden (sich selbst beenden).

Wenn mehrere Verbraucher vorhanden sind, kann der Hersteller mehrere Datenendnachrichten - eine für jeden Verbraucher - in die Warteschlange stellen, bevor er heruntergefahren wird. Da die Verbraucher nach dem Lesen der Nachricht aufhören zu konsumieren, werden sie alle irgendwann heruntergefahren.

Wenn Sie im Voraus nicht wissen, wie viele Verbraucher es gibt, kann ein Teil des ordnungsgemäßen Herunterfahrens des Verbrauchers auch darin bestehen, den Code für das Datenende erneut in die Warteschlange zu stellen.

Dadurch wird sichergestellt, dass alle Verbraucher den Code für das Datenende sehen und herunterfahren. Wenn alles erledigt ist, befindet sich noch ein Element in der Warteschlange - der Code für das Datenende, der vom letzten Verbraucher in die Warteschlange gestellt wurde.

BEARBEITEN:

Die korrekte Darstellung Ihres Datenendcodes hängt stark von der Anwendung ab, aber in vielen Fällen funktioniert ein einfaches None sehr gut. Da None ein Singleton ist, kann der Verbraucher das sehr effiziente Konstrukt if data is None verwenden, um den Endfall zu behandeln.

Eine andere Möglichkeit, die in einigen Fällen noch effizienter sein kann, besteht darin, ein try /except außerhalb Ihrer Hauptkonsumentenschleife so einzurichten, dass Sie es versucht haben, wenn die Ausnahme passiert ist um die Daten auf eine Weise zu entpacken, die immer funktioniert, außer wenn Sie den Datenende-Code verarbeiten.

EDIT 2:

Wenn Sie diese Konzepte mit Ihrem ursprünglichen Code kombinieren, führt der Hersteller Folgendes aus:

while self.running:
    product = produced() ### I/O operations
    queue.put(product)
for x in range(number_of_consumers):
    queue.put(None)  # Termination code

Jeder Verbraucher tut dies:

while 1:
    product = queue.get()
    if product is None:
        break
    consume(product)

Das Hauptprogramm kann dann einfach Folgendes tun:

producer.running = False
producer.join()
for consumer in consumers:
    consumer.join()
2
Patrick Maupin 9 Aug. 2015 im 16:06

Sie können ein Sentinel-Objekt in die Warteschlange stellen, um das Ende von Aufgaben zu signalisieren, wodurch alle Verbraucher beendet werden:

_sentinel = object()

def producer(queue):
    while running:
       # produce some data
       queue.put(data)
    queue.put(_sentinel)

def consumer(queue):
    while True:
        data = queue.get()
        if data is _sentinel:
            # put it back so that other consumers see it
            queue.put(_sentinel)
            break
        # Process data

Dieses Snippet wurde schamlos aus Python Cookbook 12.3 kopiert.

  1. Verwenden Sie ein _sentinel, um das Ende der Warteschlange zu markieren. None funktioniert auch, wenn keine vom Produzenten erstellte Aufgabe None ist, die Verwendung von _sentinel jedoch für den allgemeineren Fall sicherer ist.
  2. Sie müssen nicht für jeden Verbraucher mehrere Endmarkierungen in die Warteschlange stellen. Möglicherweise wissen Sie nicht, wie viele Threads verbraucht werden. Stellen Sie den Sentinel einfach wieder in die Warteschlange, wenn ein Verbraucher ihn findet, damit andere Verbraucher das Signal erhalten.
6
NeoWang 9 Aug. 2015 im 16:21

Eine Beobachtung aus Ihrem Code ist, dass Ihr consumer weiterhin nach etwas aus der Warteschlange sucht. Idealerweise sollten Sie dies behandeln, indem Sie einige timeout und Empty Ausnahmen für dasselbe behandeln Wie unten hilft dies im Idealfall, die while self.running or not queue.empty() für jede timeout zu überprüfen.

while self.running or not queue.empty():
    try:
        product = queue.get(timeout=1)
    except Empty:
        pass
    time.sleep(several_seconds) ###
    consume(product)

Ich habe Ihre Situation simuliert und producer und consumer Threads erstellt. Nachfolgend finden Sie den Beispielcode, der mit 2 producers und 4 consumers ausgeführt wird. Er funktioniert sehr gut. hoffe das hilft dir!

import time
import threading

from Queue import Queue, Empty

"""A multi-producer, multi-consumer queue."""

# A thread that produces data
class Producer(threading.Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, verbose=None):
        threading.Thread.__init__(self, group=group, target=target, name=name,
                                  verbose=verbose)
        self.running = True
        self.name = name
        self.args = args
        self.kwargs = kwargs

    def run(self):
        out_q = self.kwargs.get('queue')
        while self.running:
            # Adding some integer
            out_q.put(10)
            # Kepping this thread in sleep not to do many iterations
            time.sleep(0.1)

        print 'producer {name} terminated\n'.format(name=self.name)


# A thread that consumes data
class Consumer(threading.Thread):

    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, verbose=None):
        threading.Thread.__init__(self, group=group, target=target, name=name,
                                  verbose=verbose)
        self.args = args
        self.kwargs = kwargs
        self.producer_alive = True
        self.name = name

    def run(self):
        in_q = self.kwargs.get('queue')

        # Consumer should die one queue is producer si dead and queue is empty.
        while self.producer_alive or not in_q.empty():
            try:
                data = in_q.get(timeout=1)
            except Empty, e:
                pass

            # This part you can do anything to consume time
            if isinstance(data, int):
                # just doing some work, infact you can make this one sleep
                for i in xrange(data + 10**6):
                    pass
            else:
                pass
        print 'Consumer {name} terminated (Is producer alive={pstatus}, Is Queue empty={qstatus})!\n'.format(
            name=self.name, pstatus=self.producer_alive, qstatus=in_q.empty())


# Create the shared queue and launch both thread pools
q = Queue()

producer_pool, consumer_pool = [], []


for i in range(1, 3):
    producer_worker = Producer(kwargs={'queue': q}, name=str(i))
    producer_pool.append(producer_worker)
    producer_worker.start()

for i in xrange(1, 5):
    consumer_worker = Consumer(kwargs={'queue': q}, name=str(i))
    consumer_pool.append(consumer_worker)
    consumer_worker.start()

while 1:
    control_process = raw_input('> Y/N: ')
    if control_process == 'Y':
        for producer in producer_pool:
            producer.running = False
            # Joining this to make sure all the producers die
            producer.join()

        for consumer in consumer_pool:
            # Ideally consumer should stop once producers die
            consumer.producer_alive = False

        break
2
gsb-eng 9 Aug. 2015 im 17:02