Francesco Oha Rivetti
Async Howto
http://www.perl.it/documenti/articoli/2010/03/async-howto.html
© Perl Mongers Italia. Tutti i diritti riservati.

Vincitore Contest2009

Prefazione

Immaginiamo di avere una lista molto lunga di oggetti da elaborare: questa lista omogena richiede per ogni elemento una serie di operazioni su sistemi esterni, che pero' impiegano un po di tempo a rispondere.

Sebbene questo problema sembri altamente scalabile, nella pratica potrebbe risultare difficile da parallelizzare per via di limiti ambientali, tipo un limite sulle risorse usabili; se per esempio dovessimo fare una serie di chiamate http e registrarne gli esiti su un db, allora potremmo aprire anche molte connessioni http ma poche sul db o addirittura solo 1.

In questi casi puo' venir comodo un pool per le risorse limitate dove ogni singolo thread puo' ottenere temporaneamente un handler e permettere cosi' di avere tanti processi che condividono poche risorse. (Nell'esempio sopra avremmo tanti thread quante connessioni vogliamo aprire, che a termine dell'operazione chiedono in prestito una connessione a db per registrarne l'esito, quindi restituirla)

Sfortunatamente questo e' possibile solo usando appunto i threads; in multi-tasking > non e' possibile condividere le risorse visto che i task sono isolati fra loro.

E' possibile usare un'altro approccio, su un unico task posso operare su piu' connessioni in maniera asincrona...

Sincrono vs asincrono

Sincrono

Per prima cosa, vediamo cosa significano questi due termini, il modo migliore e' con un esempio.


        sub sync {
          my ($request) = @_;

          my $conn = openConn();
          $conn->send($request);

          my $response = $conn->recv();
          $conn->close();
          return $response;
        }

In questo esempio si apre una connessione e si inviano dati, quindi si aspetta di ricevere una risposta. E' possibile (in certi casi) migliorare le prestazioni aprendo una connessione una volta sola e usandola piu' volte:

        sub sync {
          my ($request, $conn) = @_;

          $conn->send($request);
          my $response = $conn->recv();
          return $response;
        }

Ci possiamo anche aspettare un meccanismo piu' complicato, dove le operazioni sono piu' d'una:

        sub sync {
          my ($request, $conn) = @_;

          $conn->send($request->head);
          my $ack = $conn->recv();
          $ack->is_ok or die;

          $conn->send($request->body);
          my $response = $conn->recv();
          return $response;
        }

A questo punto scriviamo il corpo del programma:

        my @list = fetchList();
        my $conn = openConn();
        my $db = openDB();

        foreach my @item (@list) {
          my $response = sync($item, $conn);
          $db->storeResult($item, $response);
        }

Se volessimo parallelizzare usando piu' task

        my $NTASK = 4;
        my @list = fetchList();

        # prepariamo delle partizioni in round-robin, una per ogni figlio
        my @partitions = map { []; } (1..$NTASK);
        for(my $i=0; $i < scalar @list; $i++) {
                my $tid = $i % $NTASK;
                push @{$partitions[$tid]}, $list[$i];
        }

        # fork di ogni child
        my @childs;
        foreach my $tid (1..$NTASK) {
                my $pid = fork();
                defined $pid or die;
                if($pid) {
                        # sono il padre, mi registro il pid
                        push @childs, $pid;
                } else {
                        # sono il figlio, iniziamo a lavorare
                        work($partitions[$tid]);
                        exit;
                }
        }
        # e aspettiamo che finiscano
        map { waitpid($_); } @childs;
        
        # ogni figlio lavorera' sulla sua partizione di dati
        sub work
        {
                my ($list) = @_;
                my $conn = openConn();
                my $db = openDB();
                foreach my $item ( @$list ) {
                        my $response = sync($item);
                        $db->storeResult($item, $response);
                }
        }

E' evidente che cosi' per ogni child dovro' fare una chiamata a openConn() e una a openDB(). E' gia' un bel passo avanti certo, ma il numero di child non puo' esser elevato perche' il numero di openDB non puo' esserlo.

Asincrono

Se il tempo che impiega il metodo sync() e' molto elevato, finiremmo con avere un pugno di task che aspettano una risposta nel 99% del tempo e non possiamo aumentare il numero di child perche' -- come abbiamo detto -- sul db non possiamo fare molte connessioni.

In questo caso possiamo rompere il metodo sync in due parti, una prima che invia i dati e una seconda che li elabora. Il nostro programma quindi iniviera' i dati a tante connessioni senza bloccarsi aspettando una risposta, per poi raccogliere le risposte in seguito.

Wrapper della connessione

Creiamo quindi un oggetto di connessione nostro che useremo come involucro attorno alla connessione vera. Iniziamo immaginando un caso semplice, dove si fa solo una richiesta e si ottiene solo una risposta:

        package Conn;
        
        sub new
        {
                my ($class, $conn)
                bless [
                        $conn,  # la connessione
                        undef,  # spazio per la closure INTERNA di lettura
                ], $class;
        }

        sub async
        {
                my ($self, $request, $on_success) = @_;
                die "already in use" if $self->[1]; # morte agli stacanovisti

                $self->[0]->send($request); # invio i dati

                $self->[1] = sub { # registro la closure INTERNA di lettura
                        my $response = $self->[0]->recv();
                        $on_success->($response);
                        return undef;
                };
        }

        sub dispose
        {
                my ($self) = @_;
                return unless $self->[1]; # se non c'e' nulla da fare...
                $self->[1] = $self->[1]->();
        }

        sub is_working
        {
                my ($self) = @_;
                return defined $self->[1];
        }

Il wrapper sopra puo' esser costruito partendo da una connessione, e espone 2 metodi: async e dispose.

Il suo uso e' piuttosto semplice:

        my $conn = Conn->new(openConn());
        my $db = openDB();
        # ...
        $conn->async(
                $request, # la richiesta
                sub {     # la callback
                    my $result = @_;
                    $db->storeResult($request, $result);
                }
        );
        # ...
        $conn->dispose();

Vediamo meglio cosa succede in questo disegno dove il tempo scorre verso il basso:

diagramma_oha.png Note: ---> chiama o ritorna ...> definisce

1 - Alla chiamata async() si passa la $request e una closure in grado di registrare sul db il risultato, che chiameremo callback.

2 - async() invia la richiesta e registra la closure INTERNAL nel wrapper, che chiude la callback al suo interno.

a questo punto, il codice puo' fare altro...

3 - dopo un po di tempo, si chiama il metodo dispose()

4 - dispose() invoca la closure registrata nel wrapper: INTERNAL

5 - INTERNAL blocca finche' non arriva la risposta.

6 - la closure callback viene quindi invocata e registrera' l'esito sul DB.

7 - dispose() imposta come closure interna il valore da lei stessa ritornato, cioe' undef

Tanti Wrapper per 1 solo task

Dobbiamo pero' riempire i #... del codice sopra, e' cioe' vedere come far funzionare tutto questo:

        # apro NTASK connessioni ma solo 1 a db
        my @conns = map { Conn->new(openConn()); } (1..$NTASK);
        my $db = openDB();

        # ottengo tutti i dati
        my @list = fetchList();
        my $cid = 0;

        foreach my $item (@list)
        {
                # prendo una connessione in round-robin
                my $conn = $conns[$cid++ % $NTASK];

                # chiamo dispose cosi' da sbrigare il lavoro arretrato
                $conn->dispose;

                $conn->async($request, sub { # callback
                        my $result = @_;
                        $db->storeResult($request, $result);
                });
        }

        # finiamo le cose ancora in sospeso
        map { $_->dispose(); } grep { $_->[1] } @conns;

Ecco che gia' si vede un netto miglioramento: si puo' assegnare a $NTASK un numero anche molto grande, visto che cmq useremo solo una connessione a db! E non solo, non abbiamo piu' tanti task e una fork da gestire, ma tutto in un unico processo, ancora meglio!

Inoltre si vede che per ogni elemento della lista ci si sposta di 1 nella lista di connessioni, quindi si chiamera' dispose() sullo stesso wrapper solo dopo che verra' fatto un giro completo.

Se per esempio ogni risposta richiedesse 10 secondi e usassimo 100 connessioni allora le prime 100 richieste verranno inviate quasi istantaneamente. Alla 101-esima la dispose blocchera' per 10 secondi, per poi registrare la prima risposta sul db. Alla 102-esima la dispose non blocchera', visto che sono gia' passati 10 secondi!

Idealmente se le risposte richiedessero esattamente 10 secondi l'una avremmo un atteggiamento a ondate dove partono 100 richieste e poi si blocca, poi altre 100 e poi di nuovo si blocca.

Richieste a piu' fasi

Prima abbiamo immaginato una richiesta a due fasi, dove in pratica si deve prima inviare una richiesta e dopo una seconda... ma come farlo usando il wrapper?

In realta' abbiamo gia' quasi tutto pronto, basta che la prima closure INTERNAL anziche' ritornare undef ritorni la seconda fase:

        sub async
        {
                my ($self, $request, $on_success, $on_error) = @_;
                die "already in use" if $self->[1]; # morte agli stacanovisti

                $self->[2] = $on_error;
                $self->[0]->send($request->head);

                $self->[1] = sub { # <-------------------------- INTERNAL prima fase
                        my $ack = $self->[0]->recv();

                        # se NAK, cancelliamo l'operazione
                        unless($ac->is_ok()) {
                                $ack->is_ok() or $on_error("NAK", $request, $ack);
                                return undef;
                        }

                        $self->[0]->send($request->body);
                        return sub { # <------------------------ INTERNAL seconda fase
                                my response = $self->[0]->recv();

                                if($response->is_ok()) { $on_success->($response); }
                                else { $on_error->("RESPONSE KO", $request, $response);

                                return undef;
                        };
                }
        }

Vediamo meglio cosa abbiamo aggiunto:

La close INTERNAL, come gia' detto, ora ritorna una seconda closure. In questo modo la prima chiamata a dispose non finira' il lavoro subito ma registrera' una seconda fase.

Abbiamo aggiunto anche una seconda callback on_error che verra' chiamata se qualcosa va storto sia nella prima fase che nella seconda.

Dobbiamo inoltre modificare l'uso, visto che una dispose potrebbe non esser abbastanza:

        my @conns = map { Conn->new(openConn()); } (1..$NTASK);
        my $db = openDB();
        my @list = fetchList();
        my $cid = 0;
        foreach my $item (@list)
        {
                my $conn;
                while(1) {
                        # prendo una connessione in round-robin
                        $conn = $conns[$cid++ % $NTASK];
        
                        # chiamo dispose cosi' da sbrigare il lavoro arretrato
                        $conn->dispose;

                        # se ha finito... altrimenti avanti il prossimo
                        last unless $conn->is_working;
                }

                $conn->async($request, sub { # callback
                        my $result = @_;
                        $db->storeResult($request, $result);
                });
        }

        # finiamo le cose ancora in sospeso
        map { $_->dispose() while $_->is_working(); } @conns;

Qui le cose si complicano un pochino, ma ormai avete raggiunto la meta' di questo articolo, tanto vale finire no?

Immaginiamo sempre il nostro caso teorico dove ogni richiesta richiede 10 secondi e abbiamo 100 connessioni. Per le prima 100 richieste, come gia' succedeva', invieremo la prima parte della richiesta quasi istantaneamente.

Arrivati alla 101-esima richiesta, pero', le cose cambiano: Infatti si chiamera' la dispose sulla prima connessione che blocchera' fino ad ottenere la risposta della prima fase, e inviera' la richiesta per la seconda fase.

Purtroppo pero' la connessione NON e' ancora disponibile, si dovra' dunque iterare nel ciclo e tentare con la seconda, che non blocchera' piu' per 10 secondi ma si spostera' anch'essa alla seconda fase e via dicendo, fino a tornare alla prima.

La prima nuovamente blocchera' per 10 secondi, terminera' dunque il lavoro e ritornera' disponibile all'uso, procendendo dunque all'effettivo invio della 101-esima richiesta, 20 secondi dopo.

Le successive richieste non blocceranno (come per la 101-esima): esattamente come volevamo.

IO::Select

Il sistema sopra funziona egregiamente quando i tempi di risposta sono abbastanza costanti, ma quando alcune risposte impiegano molto piu' di altre allora possono sorgere problemi di efficienza.

Immaginiamo di avere solo 2 istanze di wrapper, al primo e' stata fatta una richiesta che richiedera' 10 secondi mentre al secondo una richiesta che richiedera' 1 secondo.

Quando arrivera' la terza richiesta, questa blocchera' per 10 secondi sul primo slot anche se avrebbe potuto aspettare solo 1 secondo se avesse usato il secondo.

Se il nostro wrapper usa direttamente degli handler per comunicare, e' possibile usare la syscall select per sapere, data una lista di handlers, quale di questi handlers ha dati disponibili per la lettura.

Per semplificare la gestione useremo il modulo the IO::Select manpage, ecco un esempio:

        use IO::Select;

        my $sel = IO::Select->new();
        map { $sel->add($_); } @handlers;
        # ....
        my @ready = $sel->can_read($timeout);

Nell'array @ready otteniamo alcuni degli handlers aggiungi a $sel, vale a dire quelli che hanno dati disponibili per la lettura.

Il parametro $timeout indica quanto tempo aspettare per avere almeno 1 filehandle. Cio' significa che se uno o piu' handler e' gia' pronto ritornera' subito, oppure aspettera fino ad un massimo di $timeout secondi prima di arrendersi e tornare una lista vuota.

Modifichiamo quindi radicalmente il nostro wrapper e rendiamolo un vero pool di connessioni wrapped:

        package Pool;
        use IO::Select;
        use IO::Socket;

        sub new {
                my ($class, %args) = @_;
                bless { 
                        %args,
                        _slots = {},
                        _sel = IO::Select->new();
                }, $class;
        }

        sub _create {
                my ($self) = @_;
                my $slot = [
                        IO::Socket::INET->new($self->{host}),
                        undef, # INTERNAL
                        undef, # ON_ERROR
                ];
                my $fd = $slot->[0]->fileno or die;
                $slot->[0]->autoflush(1);
                $self->{_slots}->{$fd} = $slot;
                $self->{_sel}->add($slot->[0]);
                return $slot;
        }

        sub _find {
                my ($self) = @_;
                $self->dispose(0);
                while(1) {
                        my $ct = 0;
                        foreach my $slot (values %{$self->{_slots}})
                        {
                                $ct++;
                                return $slot unless $slot->is_working();
                        }
                        return $self->_create() if $ct < $self->{size};
                        $self->dispose(0.5);
                }
        }

        sub close {
                my ($self) = @_;

                OUTER: while(1) {
                        foreach my $slot (values %{$self->{_slots}})
                        {
                                if($slot->[1]) {
                                        $self->dispose(0.5);
                                        continue OUTER;
                                }
                        }
                        break;
                }
        }

        sub dispose {
                my ($self, $timeout) = @_;

                # get what is ready for data
                my @ready = $self->{_sel}->can_read($timeout);

                foreach my $fh (@ready) {
                        # get the slot
                        my $fd = $fh->fileno;
                        my $slot = $self->{_slot}->{$fd};

                        # invoke che INTERNAL
                        eval { $slot->[1] = $slot->[1]->(); };
                        next unless $@;
                        my $err = $@;

                        # remove the slot
                        delete $self->{_slot}->{$fd};
                        $self->{_sel}->remove($slot->[0]);

                        # call the on_error
                        eval { $slot->[2]->($err); };
                }
        }

        sub async {
                my ($self, $req, $on_success, $on_error) = @_;

                # find a slot
                my $slot = $self->_find();
                my $fh = $slot->[0];

                # register the on_error
                $slot->[2] = $on_error;

                # send first part of request
                $fh->print($req->head);

                # register the response handler 
                $slot->[1] = sub {
                        my $ack = <$fh>;

                        # die, the dispose() will fire the on_error
                        die "NAK" unless $ack =~ /^OK/;

                        # send the second part of the request
                        $fh->print($req->body);

                        # register the new handler
                        return sub {
                                my $res = <$fh>;
                                die "KO" unless $res =~ /^200/;

                                # all went fine, call on_success
                                $on_success->($res);

                                # free the slot
                                return undef;
                        };
                };
        }

Molte cose nuove, per prima cosa abbiamo un oggetto unico che conterra' tutte le connessioni aperte, definito con un massimo di connessioni disponibili.

Il metodo dispose() deve ora gestire il gruppo intero di connessioni, usando il selettore IO::Select creato nel costruttore.

Accetta anche un parametro di $timeout che indica quanto tempo aspettare al massimo.

Il metodo async() e' simile al precedente, ma chiamera' il metodo interno _find() per scoprire quale slot usare invece di fare round-robin.

Il vantaggio, oltre che l'efficienza, e' una piu' semplice usabilita' all'esterno:

        my $pool = Pool->new(
                host => '1.2.3.4:1234',
                size => 100,
        );
        my $db = openDB();

        foreach my $req (@list) 
        {
                $pool->async( $req, 
                  sub {
                      my ($result) = @_;
                      $db->storeResult($req, $result);
                  }, 
                  sub {
                      warn "error while $req:  @_";
                });
        }
        $pool->close();

Il metodo _find() per prima cosa vede di chiamare la dispose(), quindi se non trova slot liberi verifica quanti sono aperti, se sono meno del limite crea una nuova connessione, altrimenti richiama la dispose() passando un timeout e aspetta che qualcuno si liberi.

Visto che _find() viene chiamato ad ogni chiamata a async() possiamo evitare di chiamare il metodo dispose() direttamente, a meno che non serva controllare la coda senza aggiunger nulla.

Usando the IO::Select manpage abbiamo inoltre risolto il problema delle richieste a tempo non costante: se avessimo come prima massimo 2 connessioni impegnate dove la prima rispondera' fra 10 secondi e la seconda solo dopo 1 secondo, la chiamata a async() iniziera' a chiamare _find() che non potendo creare una connessione nuova chiamera' la dispose() anche piu' volte, dopo qualche tentativo finalmente otterremo la risposta sul secondo slot rendendolo disponibile e _find() ritornera' questo slot permettendo cosi' l'invio del terzo messaggio dopo 1 secondo.

Cosa aggiungere ora?

Il sistema usato sopra non fa piu' round-robin ma gestisce le connessioni in ordine sparso.

E' possibile quindi aggiungere altri parametri allo slot, come un timeout per la lettura e un idle_time oltre il quale chiudere la connessione.

In questo modo le connessioni oltre ad aprirsi quando richiesto possono anche chiudersi dopo un tempo di idle.

E' possibile inoltre distruggere le connessioni dopo qualche migliaio di usi, cosi' da liberare eventuali risorse allocate lato server.

Per fare questo aggiungiamo a $self->{_next_expire_check} e aggiungiamo allo slot un quarto parametro last_use:

        sub dispose {
                my ($self, $timeout) = @_;

                # get what is ready for data
                my @ready = $self->{_sel}->can_read($timeout);
                my %closing;

                foreach my $fh (@ready) {
                        # get the slot
                        my $fd = $fh->fileno;
                        my $slot = $self->{_slot}->{$fd};

                        # invoke che INTERNAL
                        eval { 
                                $slot->[1] = $slot->[1]->(); 
                                $slot->[3] = time(); # aggiorno last_use
                        };
                        next unless $@;

                        # mark for closing
                        $closing{$fd} = $@;
                }

                next unless time() > $self->{_next_expire_check};
                $self->{_next_expire_check} = time();
                
                foreach my $slot ( values %{ $self->{_slots} } )
                {
                        my $timeout = $slot->[1]   # e' in uso?
                                ? $self->{read_timeout} 
                                : $self->{idle_timeout};

                        # confronto il last_use
                        next unless $slot->[3]+$timeout < time();

                        # se e' passato troppo tempo
                        $closing{$slot->[0]->fileno} ||= "TIMEOUT";
                }
                
                # per tutte quelle in closing...
                while( my($fd, $err) = each %closing )
                {
                        my $slot = $self->{_slots}->{$fd};

                        # remove the slot
                        delete $self->{_slots}->{$fd};
                        $self->{_sel}->remove($slot->[0]);

                        # call the ERROR callback if still in 
                        eval { $slot->[2]->($err); } if $slot->[1];
                }
        }

e aggiungiamo alla sub async un reset del last_use alla fine prima di uscire:

        $slot->[3] = time();

ora non serve altro che impostare questi valori durante la costruzione:

        my $pool = Pool->new(
                host => '1.2.3.4:1234',
                size => 100,
                read_timeout => 30, 
                idle_timeout => 120,
        );

E il resto rimane uguale, ora le connessioni che non rispondono in 30 secondi chiameranno la callback di errore con TIMEOUT mentre quelle non usate per 120 secondi verranno chiuse silenziosamente.

Conclusioni

Certo si e' dovuto scrivere un mucchio di codice, e la gestione degli errori e' complessa, ma i vantaggi sono evidenti: Un solo task gestisce piu' connessioni, una sorta di reattore.

E' necessario pero' tener presente che la gestione degli errori puo' esser complessa!