2016-08-04 31 views
0

Ich bin der Verarbeitung von Dateien mit der Funktion und haben meine Themen wie folgt beginnen:Perl Threads und Semaphore

for my $file (@files){ 
    $threads[$k] = threads->create('function', $file); 
    $k++; 
} 

Ich möchte die Anzahl der parallelen Prozesse begrenzen. Wie ist das gemacht? Ich habe mir viele Semaphore/Queue-Beispiele angeschaut, konnte aber nichts so Einfaches für meinen Fall finden.

Gibt es eine Idee, wie ich die Thread-Nummer einfach begrenzen kann?

+0

Während [diese Antwort] (http://stackoverflow.com/a/12371377/1331451) über Gabeln ist, nicht Threads, ist es verwandt. Threads gehören nicht zu den stärksten Features von Perl. Sie könnten besser eine andere Technik verwenden. – simbabque

+0

Also ist das Forks() Beispiel überhaupt nicht hilfreich, da diese Funktionen nicht in Threads() implementiert sind. – Alex

+0

Der Fork-Vorschlag sollte Sie dazu ermutigen, keine Bedrohungen zu verwenden, sondern auf Forking zurückgreifen. – simbabque

Antwort

6

Der einfachste Weg zur Begrenzung der Parallelität und auch eine der effektiveren Möglichkeiten zur Bereitstellung von Threads ist die Verwendung eines "Worker Thread" -Modells.

Insbesondere - haben Sie einen Thread, der in einer Schleife sitzt, eine Warteschlange liest und darauf arbeitet. so etwas wie diese

Das wäre:

#!/usr/bin/perl 

use strict; 
use warnings; 

use threads; 

use Thread::Queue; 

my $nthreads = 5; 

my $process_q = Thread::Queue->new(); 
my $failed_q = Thread::Queue->new(); 

#this is a subroutine, but that runs 'as a thread'. 
#when it starts, it inherits the program state 'as is'. E.g. 
#the variable declarations above all apply - but changes to 
#values within the program are 'thread local' unless the 
#variable is defined as 'shared'. 
#Behind the scenes - Thread::Queue are 'shared' arrays. 

sub worker { 
    #NB - this will sit a loop indefinitely, until you close the queue. 
    #using $process_q -> end 
    #we do this once we've queued all the things we want to process 
    #and the sub completes and exits neatly. 
    #however if you _don't_ end it, this will sit waiting forever. 
    while (my $server = $process_q->dequeue()) { 
     chomp($server); 
     print threads->self()->tid() . ": pinging $server\n"; 
     my $result = `/bin/ping -c 1 $server`; 
     if ($?) { $failed_q->enqueue($server) } 
     print $result; 
    } 
} 

#insert tasks into thread queue. 
open(my $input_fh, "<", "server_list") or die $!; 
$process_q->enqueue(<$input_fh>); 
close($input_fh); 

#we 'end' process_q - when we do, no more items may be inserted, 
#and 'dequeue' returns 'undefined' when the queue is emptied. 
#this means our worker threads (in their 'while' loop) will then exit. 
$process_q->end(); 

#start some threads 
for (1 .. $nthreads) { 
    threads->create(\&worker); 
} 

#Wait for threads to all finish processing. 
foreach my $thr (threads->list()) { 
    $thr->join(); 
} 

#collate results. ('synchronise' operation) 
while (my $server = $failed_q->dequeue_nb()) { 
    print "$server failed to ping\n"; 
} 

Semaphore sind wirklich über den Zugang zu begrenzten Ressourcen Schlichtung, und für ‚Bewachung‘ Teil eines Prozesses.

Also, wenn Sie einschließen wollte - sagen - ein ssh Betrieb in Ihrem Code, wollte aber nicht mehr als 20 Verbindungen gleichzeitig haben, würden Sie:

my $ssh_limit = Thread::Semaphore -> new (20); 

Und in deinen Thread:

$ssh_limit -> down; 
#do ssh thing 
$ssh_limit -> up; 

Jeder Thread wird blockiert, bis die Ressource verfügbar ist.

Aber dies ist keine effektive Möglichkeit, "ganze Threads" zu steuern - die Antwort dort ist nur die richtige Nummer an erster Stelle zu starten, und verwenden Sie Warteschlangen, um sie Daten zu füttern.