| |||
| © Perl Mongers Italia. Tutti i diritti riservati. | |||
PrefazioneImmaginiamo di avere una lista molto lunga di oggetti da elaborare: questa lista omogena richiede per ogni elemento una serie di operazioni su sistemi esterni, che pero' impiegano un po di tempo a rispondere. Sebbene questo problema sembri altamente scalabile, nella pratica potrebbe risultare difficile da parallelizzare per via di limiti ambientali, tipo un limite sulle risorse usabili; se per esempio dovessimo fare una serie di chiamate http e registrarne gli esiti su un db, allora potremmo aprire anche molte connessioni http ma poche sul db o addirittura solo 1. In questi casi puo' venir comodo un pool per le risorse limitate dove ogni singolo thread puo' ottenere temporaneamente un handler e permettere cosi' di avere tanti processi che condividono poche risorse. (Nell'esempio sopra avremmo tanti thread quante connessioni vogliamo aprire, che a termine dell'operazione chiedono in prestito una connessione a db per registrarne l'esito, quindi restituirla) Sfortunatamente questo e' possibile solo usando appunto i threads; in multi-tasking > non e' possibile condividere le risorse visto che i task sono isolati fra loro. E' possibile usare un'altro approccio, su un unico task posso operare su piu' connessioni in maniera asincrona...
Sincrono vs asincrono
SincronoPer prima cosa, vediamo cosa significano questi due termini, il modo migliore e' con un esempio.
sub sync {
my ($request) = @_;
my $conn = openConn();
$conn->send($request);
my $response = $conn->recv();
$conn->close();
return $response;
}
In questo esempio si apre una connessione e si inviano dati, quindi si aspetta di ricevere una risposta. E' possibile (in certi casi) migliorare le prestazioni aprendo una connessione una volta sola e usandola piu' volte:
sub sync {
my ($request, $conn) = @_;
$conn->send($request);
my $response = $conn->recv();
return $response;
}
Ci possiamo anche aspettare un meccanismo piu' complicato, dove le operazioni sono piu' d'una:
sub sync {
my ($request, $conn) = @_;
$conn->send($request->head);
my $ack = $conn->recv();
$ack->is_ok or die;
$conn->send($request->body);
my $response = $conn->recv();
return $response;
}
A questo punto scriviamo il corpo del programma:
my @list = fetchList();
my $conn = openConn();
my $db = openDB();
foreach my @item (@list) {
my $response = sync($item, $conn);
$db->storeResult($item, $response);
}
Se volessimo parallelizzare usando piu' task
my $NTASK = 4;
my @list = fetchList();
# prepariamo delle partizioni in round-robin, una per ogni figlio
my @partitions = map { []; } (1..$NTASK);
for(my $i=0; $i < scalar @list; $i++) {
my $tid = $i % $NTASK;
push @{$partitions[$tid]}, $list[$i];
}
# fork di ogni child
my @childs;
foreach my $tid (1..$NTASK) {
my $pid = fork();
defined $pid or die;
if($pid) {
# sono il padre, mi registro il pid
push @childs, $pid;
} else {
# sono il figlio, iniziamo a lavorare
work($partitions[$tid]);
exit;
}
}
# e aspettiamo che finiscano
map { waitpid($_); } @childs;
# ogni figlio lavorera' sulla sua partizione di dati
sub work
{
my ($list) = @_;
my $conn = openConn();
my $db = openDB();
foreach my $item ( @$list ) {
my $response = sync($item);
$db->storeResult($item, $response);
}
}
E' evidente che cosi' per ogni child dovro' fare una chiamata a
AsincronoSe il tempo che impiega il metodo In questo caso possiamo rompere il metodo sync in due parti, una prima che invia i dati e una seconda che li elabora. Il nostro programma quindi iniviera' i dati a tante connessioni senza bloccarsi aspettando una risposta, per poi raccogliere le risposte in seguito.
Wrapper della connessioneCreiamo quindi un oggetto di connessione nostro che useremo come involucro attorno alla connessione vera. Iniziamo immaginando un caso semplice, dove si fa solo una richiesta e si ottiene solo una risposta:
package Conn;
sub new
{
my ($class, $conn)
bless [
$conn, # la connessione
undef, # spazio per la closure INTERNA di lettura
], $class;
}
sub async
{
my ($self, $request, $on_success) = @_;
die "already in use" if $self->[1]; # morte agli stacanovisti
$self->[0]->send($request); # invio i dati
$self->[1] = sub { # registro la closure INTERNA di lettura
my $response = $self->[0]->recv();
$on_success->($response);
return undef;
};
}
sub dispose
{
my ($self) = @_;
return unless $self->[1]; # se non c'e' nulla da fare...
$self->[1] = $self->[1]->();
}
sub is_working
{
my ($self) = @_;
return defined $self->[1];
}
Il wrapper sopra puo' esser costruito partendo da una connessione, e espone 2 metodi: Il suo uso e' piuttosto semplice:
my $conn = Conn->new(openConn());
my $db = openDB();
# ...
$conn->async(
$request, # la richiesta
sub { # la callback
my $result = @_;
$db->storeResult($request, $result);
}
);
# ...
$conn->dispose();
Vediamo meglio cosa succede in questo disegno dove il tempo scorre verso il basso:
Note: ---> chiama o ritorna ...> definisce
1 - Alla chiamata 2 - a questo punto, il codice puo' fare altro... 3 - dopo un po di tempo, si chiama il metodo 4 - 5 - 6 - la closure 7 -
Tanti Wrapper per 1 solo taskDobbiamo pero' riempire i
# apro NTASK connessioni ma solo 1 a db
my @conns = map { Conn->new(openConn()); } (1..$NTASK);
my $db = openDB();
# ottengo tutti i dati
my @list = fetchList();
my $cid = 0;
foreach my $item (@list)
{
# prendo una connessione in round-robin
my $conn = $conns[$cid++ % $NTASK];
# chiamo dispose cosi' da sbrigare il lavoro arretrato
$conn->dispose;
$conn->async($request, sub { # callback
my $result = @_;
$db->storeResult($request, $result);
});
}
# finiamo le cose ancora in sospeso
map { $_->dispose(); } grep { $_->[1] } @conns;
Ecco che gia' si vede un netto miglioramento: si puo' assegnare a Inoltre si vede che per ogni elemento della lista ci si sposta di 1 nella lista di connessioni, quindi si chiamera' Se per esempio ogni risposta richiedesse 10 secondi e usassimo 100 connessioni allora le prime 100 richieste verranno inviate quasi istantaneamente. Alla 101-esima la dispose blocchera' per 10 secondi, per poi registrare la prima risposta sul db. Alla 102-esima la dispose non blocchera', visto che sono gia' passati 10 secondi! Idealmente se le risposte richiedessero esattamente 10 secondi l'una avremmo un atteggiamento a ondate dove partono 100 richieste e poi si blocca, poi altre 100 e poi di nuovo si blocca.
Richieste a piu' fasiPrima abbiamo immaginato una richiesta a due fasi, dove in pratica si deve prima inviare una richiesta e dopo una seconda... ma come farlo usando il wrapper? In realta' abbiamo gia' quasi tutto pronto, basta che la prima closure
sub async
{
my ($self, $request, $on_success, $on_error) = @_;
die "already in use" if $self->[1]; # morte agli stacanovisti
$self->[2] = $on_error;
$self->[0]->send($request->head);
$self->[1] = sub { # <-------------------------- INTERNAL prima fase
my $ack = $self->[0]->recv();
# se NAK, cancelliamo l'operazione
unless($ac->is_ok()) {
$ack->is_ok() or $on_error("NAK", $request, $ack);
return undef;
}
$self->[0]->send($request->body);
return sub { # <------------------------ INTERNAL seconda fase
my response = $self->[0]->recv();
if($response->is_ok()) { $on_success->($response); }
else { $on_error->("RESPONSE KO", $request, $response);
return undef;
};
}
}
Vediamo meglio cosa abbiamo aggiunto: La close Abbiamo aggiunto anche una seconda callback Dobbiamo inoltre modificare l'uso, visto che una dispose potrebbe non esser abbastanza:
my @conns = map { Conn->new(openConn()); } (1..$NTASK);
my $db = openDB();
my @list = fetchList();
my $cid = 0;
foreach my $item (@list)
{
my $conn;
while(1) {
# prendo una connessione in round-robin
$conn = $conns[$cid++ % $NTASK];
# chiamo dispose cosi' da sbrigare il lavoro arretrato
$conn->dispose;
# se ha finito... altrimenti avanti il prossimo
last unless $conn->is_working;
}
$conn->async($request, sub { # callback
my $result = @_;
$db->storeResult($request, $result);
});
}
# finiamo le cose ancora in sospeso
map { $_->dispose() while $_->is_working(); } @conns;
Qui le cose si complicano un pochino, ma ormai avete raggiunto la meta' di questo articolo, tanto vale finire no? Immaginiamo sempre il nostro caso teorico dove ogni richiesta richiede 10 secondi e abbiamo 100 connessioni. Per le prima 100 richieste, come gia' succedeva', invieremo la prima parte della richiesta quasi istantaneamente. Arrivati alla 101-esima richiesta, pero', le cose cambiano: Infatti si chiamera' la dispose sulla prima connessione che blocchera' fino ad ottenere la risposta della prima fase, e inviera' la richiesta per la seconda fase. Purtroppo pero' la connessione NON e' ancora disponibile, si dovra' dunque iterare nel ciclo e tentare con la seconda, che non blocchera' piu' per 10 secondi ma si spostera' anch'essa alla seconda fase e via dicendo, fino a tornare alla prima. La prima nuovamente blocchera' per 10 secondi, terminera' dunque il lavoro e ritornera' disponibile all'uso, procendendo dunque all'effettivo invio della 101-esima richiesta, 20 secondi dopo. Le successive richieste non blocceranno (come per la 101-esima): esattamente come volevamo.
IO::SelectIl sistema sopra funziona egregiamente quando i tempi di risposta sono abbastanza costanti, ma quando alcune risposte impiegano molto piu' di altre allora possono sorgere problemi di efficienza. Immaginiamo di avere solo 2 istanze di wrapper, al primo e' stata fatta una richiesta che richiedera' 10 secondi mentre al secondo una richiesta che richiedera' 1 secondo. Quando arrivera' la terza richiesta, questa blocchera' per 10 secondi sul primo slot anche se avrebbe potuto aspettare solo 1 secondo se avesse usato il secondo. Se il nostro wrapper usa direttamente degli handler per comunicare, e' possibile usare la syscall Per semplificare la gestione useremo il modulo the IO::Select manpage, ecco un esempio:
use IO::Select;
my $sel = IO::Select->new();
map { $sel->add($_); } @handlers;
# ....
my @ready = $sel->can_read($timeout);
Nell'array Il parametro Modifichiamo quindi radicalmente il nostro wrapper e rendiamolo un vero pool di connessioni wrapped:
package Pool;
use IO::Select;
use IO::Socket;
sub new {
my ($class, %args) = @_;
bless {
%args,
_slots = {},
_sel = IO::Select->new();
}, $class;
}
sub _create {
my ($self) = @_;
my $slot = [
IO::Socket::INET->new($self->{host}),
undef, # INTERNAL
undef, # ON_ERROR
];
my $fd = $slot->[0]->fileno or die;
$slot->[0]->autoflush(1);
$self->{_slots}->{$fd} = $slot;
$self->{_sel}->add($slot->[0]);
return $slot;
}
sub _find {
my ($self) = @_;
$self->dispose(0);
while(1) {
my $ct = 0;
foreach my $slot (values %{$self->{_slots}})
{
$ct++;
return $slot unless $slot->is_working();
}
return $self->_create() if $ct < $self->{size};
$self->dispose(0.5);
}
}
sub close {
my ($self) = @_;
OUTER: while(1) {
foreach my $slot (values %{$self->{_slots}})
{
if($slot->[1]) {
$self->dispose(0.5);
continue OUTER;
}
}
break;
}
}
sub dispose {
my ($self, $timeout) = @_;
# get what is ready for data
my @ready = $self->{_sel}->can_read($timeout);
foreach my $fh (@ready) {
# get the slot
my $fd = $fh->fileno;
my $slot = $self->{_slot}->{$fd};
# invoke che INTERNAL
eval { $slot->[1] = $slot->[1]->(); };
next unless $@;
my $err = $@;
# remove the slot
delete $self->{_slot}->{$fd};
$self->{_sel}->remove($slot->[0]);
# call the on_error
eval { $slot->[2]->($err); };
}
}
sub async {
my ($self, $req, $on_success, $on_error) = @_;
# find a slot
my $slot = $self->_find();
my $fh = $slot->[0];
# register the on_error
$slot->[2] = $on_error;
# send first part of request
$fh->print($req->head);
# register the response handler
$slot->[1] = sub {
my $ack = <$fh>;
# die, the dispose() will fire the on_error
die "NAK" unless $ack =~ /^OK/;
# send the second part of the request
$fh->print($req->body);
# register the new handler
return sub {
my $res = <$fh>;
die "KO" unless $res =~ /^200/;
# all went fine, call on_success
$on_success->($res);
# free the slot
return undef;
};
};
}
Molte cose nuove, per prima cosa abbiamo un oggetto unico che conterra' tutte le connessioni aperte, definito con un massimo di connessioni disponibili. Il metodo Accetta anche un parametro di Il metodo Il vantaggio, oltre che l'efficienza, e' una piu' semplice usabilita' all'esterno:
my $pool = Pool->new(
host => '1.2.3.4:1234',
size => 100,
);
my $db = openDB();
foreach my $req (@list)
{
$pool->async( $req,
sub {
my ($result) = @_;
$db->storeResult($req, $result);
},
sub {
warn "error while $req: @_";
});
}
$pool->close();
Il metodo Visto che Usando the IO::Select manpage abbiamo inoltre risolto il problema delle richieste a tempo non costante:
se avessimo come prima massimo 2 connessioni impegnate dove la prima rispondera' fra 10 secondi e la seconda solo dopo 1 secondo,
la chiamata a
Cosa aggiungere ora?Il sistema usato sopra non fa piu' round-robin ma gestisce le connessioni in ordine sparso. E' possibile quindi aggiungere altri parametri allo slot, come un timeout per la lettura e un idle_time oltre il quale chiudere la connessione. In questo modo le connessioni oltre ad aprirsi quando richiesto possono anche chiudersi dopo un tempo di E' possibile inoltre distruggere le connessioni dopo qualche migliaio di usi, cosi' da liberare eventuali risorse allocate lato server. Per fare questo aggiungiamo a
sub dispose {
my ($self, $timeout) = @_;
# get what is ready for data
my @ready = $self->{_sel}->can_read($timeout);
my %closing;
foreach my $fh (@ready) {
# get the slot
my $fd = $fh->fileno;
my $slot = $self->{_slot}->{$fd};
# invoke che INTERNAL
eval {
$slot->[1] = $slot->[1]->();
$slot->[3] = time(); # aggiorno last_use
};
next unless $@;
# mark for closing
$closing{$fd} = $@;
}
next unless time() > $self->{_next_expire_check};
$self->{_next_expire_check} = time();
foreach my $slot ( values %{ $self->{_slots} } )
{
my $timeout = $slot->[1] # e' in uso?
? $self->{read_timeout}
: $self->{idle_timeout};
# confronto il last_use
next unless $slot->[3]+$timeout < time();
# se e' passato troppo tempo
$closing{$slot->[0]->fileno} ||= "TIMEOUT";
}
# per tutte quelle in closing...
while( my($fd, $err) = each %closing )
{
my $slot = $self->{_slots}->{$fd};
# remove the slot
delete $self->{_slots}->{$fd};
$self->{_sel}->remove($slot->[0]);
# call the ERROR callback if still in
eval { $slot->[2]->($err); } if $slot->[1];
}
}
e aggiungiamo alla sub
$slot->[3] = time();
ora non serve altro che impostare questi valori durante la costruzione:
my $pool = Pool->new(
host => '1.2.3.4:1234',
size => 100,
read_timeout => 30,
idle_timeout => 120,
);
E il resto rimane uguale, ora le connessioni che non rispondono in 30 secondi chiameranno la callback di errore con
ConclusioniCerto si e' dovuto scrivere un mucchio di codice, e la gestione degli errori e' complessa, ma i vantaggi sono evidenti: Un solo task gestisce piu' connessioni, una sorta di reattore. E' necessario pero' tener presente che la gestione degli errori puo' esser complessa! | |||