Metodologie di Programmazione: Lezione 16

Riccardo Silvestri

Coordinare task

A volte una computazione può essere efficacemente suddivisa in task che possono essere eseguiti in modo concorrente o parallelo. Però i task potrebbero non essere completamente indipendenti e per poter ottenere il risultato finale occorre coordinarne l'esecuzione. Inoltre, la particolare suddivisione operata e il coordinamento usato possono influenzare fortemente l'efficienza dell'intera computazione. L'efficienza non riguarda solamente il tempo di esecuzione ma anche altre risorse come il numero di thread e la memoria usata.

In questa lezione useremo come esempio la computazione della somma totale dei byte dei file contenuti in una directory. Mostreremo molte implementazioni differenti a partire da quella sequenziale. Diversi modi di suddividere la computazione in task e diversi modi di coordinare l'esecuzione concorrente dei task saranno discussi, sperimentati e confrontati. Questo ci permetterà di introdurre alcuni dei principali strumenti offerti dalla libreria di Java per l'esecuzione concorrente di task e per il loro coordinamento. Le diverse implementazioni concorrenti che vedremo rispecchiano alcuni design pattern e criteri la cui applicabilità è molto più ampia di quella sottesa dalla particolare computazione presa come esempio.

La computazione e il testing

Come già menzionato, la computazione che vogliamo implementare è la somma dei byte di tutti i file contenuti in una directory, a qualsiasi livello. Essendo una computazione che riguarda i file, vogliamo prima di tutto effettuare alcune riorganizzazioni del codice o più precisamente code refactoring1. Creiamo un nuovo package mp.file e spostiamo la classe TestFile dal package mp al nuovo package. Poi creiamo una nuova classe Utils in mp.file per metodi di utilità per i file. Spostiamo quindi i metodi wordMap, fileTreeToString e test_fileTreeToString da mp.util.Utils nella nuova classe.

Scriviamo ora l'implementazione sequenziale e ricorsiva in mp.file.Utils. Per semplicità ignoriamo gli errori di I/O2, inoltre consideriamo solamente file regolari (ignorando ad esempio link simbolici) perché il metodo long Files.size(Path p) che ritorna il numero di byte del file p è garantito funzionare solamente per file regolari.

private static final LinkOption NOL = LinkOption.NOFOLLOW_LINKS;

/** Ritorna il numero totale di byte contenuti nella directory specificata. Se
 * non è una directory, ritorna 0. I link simbolici non sono seguiti, solamente
 * i file regolari sono conteggiati e tutti gli errori di I/O sono ignorati.
 * @param d  percorso di una directory
 * @return il numero totale di byte contenuti nella directory */
public static long totalSize(Path d) {
    long size = 0;
    try (Stream<Path> list = Files.list(d)) {
        for (Path p : list.toArray(Path[]::new)) {
            if (Files.isDirectory(p, NOL)) {
                size += totalSize(p);
            } else if (Files.isRegularFile(p, NOL))
                size += Files.size(p);       // Files.size è garantito
        }                                    // solo per file regolari
    } catch (IOException ex) { }  // Ignora errori di I/O
    return size;
}

Per misurare la memoria durante i test ci occorrono alcuni metodi che aggiungiamo alla classe mp.util.Utils

/** Ritorna una stima della memoria totale (heap e non-heap) attualmente usata.
 * @return una stima della memoria totale attualmente usata */
public static long getUsedMem() {
    long curr = 0;
    for (MemoryPoolMXBean m : ManagementFactory.getMemoryPoolMXBeans())
        curr += m.getUsage().getUsed();
    return curr;
}

/** Ritorna il picco della memoria totale usata da quando la JVM è partita o
 * dall'ultima volta che è stato fatto un reset (vedi
 * {@link mp.util.Utils#resetPeakMem()}).
 * @return il picco della memoria totale usata */
public static long getPeakMem() {
    long peak = 0;
    for (MemoryPoolMXBean m : ManagementFactory.getMemoryPoolMXBeans())
        peak += m.getPeakUsage().getUsed();
    return peak;
}

/** Imposta il picco della memoria totale alla memoria attualmente usata */
public static void resetPeakMem() {
    ManagementFactory.getMemoryPoolMXBeans()
            .forEach(java.lang.management.MemoryPoolMXBean::resetPeakUsage);
}

Adesso scriviamo un metodo che ci permette di fare i test delle diverse implementazioni che andremo a scrivere. Il metodo misura il tempo d'esecuzione, il massimo numero di thread addizionali usati e il picco della memoria usata durante il test. Inoltre permette anche di ripetere l'esecuzione un numero di volte specificato. Quindi nella classe mp.file.TestFile definiamo,

/** Mette alla prova un metodo che preso in input il percorso di una directory
 * ritorna la somma di tutti i byte dei file regolari contenuti nella
 * directory. Stampa il valore ritornato, il minimo, il massimo e la media dei
 * tempi di esecuzione relativamente alla n invocazioni. Inoltre, stampa i
 * picchi, registrati durante il test, del numero di thread addizionali usati e
 * della memoria addizionale usata.
 * @param name  nome del metodo
 * @param ts  permette di invocare il metodo
 * @param p  il percorso della directory
 * @param n  numero di volte che il metodo è invocato */
public static void test_ts(String name, Function<Path,Long> ts, Path p, int n) {
    out.println(name+"  Directory: "+p);
    long max = 0, min = -1, size = 0;
    double average = 0;
    ThreadMXBean tm = ManagementFactory.getThreadMXBean();
    int nt = tm.getThreadCount();   // Numero attuale di thread
    tm.resetPeakThreadCount();
    long mem = Utils.getUsedMem();  // Memoria attualmente usata
    Utils.resetPeakMem();
    try {
        for (int i = 0; i < n; i++) {
            long time = System.currentTimeMillis();
            size = ts.apply(p);
            time = System.currentTimeMillis() - time;
            if (time > max) max = time;
            if (min == -1 || time < min) min = time;
            average += time;
        }
    } catch (Exception ex) { out.println(ex); }
    nt = tm.getPeakThreadCount() - nt;  // Picco numero thread addizionali
    mem = Utils.getPeakMem() - mem;     // Picco memoria addizionale
    average /= n;
    out.println(String.format("Size: %s  Time (seconds): min = %.2f "+
                    " max = %.2f ave = %.2f",
            Utils.toGMKB(size), min/1000.0, max/1000.0, average/1000.0));
    out.println("Picco numero thread addizionali: "+nt);
    out.println("Picco memoria addizionale: " + Utils.toGMKB(mem));
 }

Per i test usiamo una directory /usr piuttosto grande: 265541 file, 21872 directory su 15 livelli. L'invocazione per il test è

Path dir = Paths.get("/usr");
test_ts("totalSize", mp.file.Utils::totalSize, dir, 10);

Invochiamo il metodo da testare 10 volte perché i tempi possono cambiare in modo significativo a causa delle varie cache usate dal sistema operativo sottostante per le chiamate di sistema relative al file system. Il testing produce,

totalSize  Directory: /usr
Size: 7GB 495MB 645KB 536B  Time (seconds): min = 3.32 max = 5.65 ave = 4.29
Picco numero thread addizionali: 0
Picco memoria addizionale: 61MB 56KB 136B

La macchina usata ha 8 core e la figura qui sotto mostra l'attività dei core (campionati ogni secondo), durante il test

Il colore rosso indica l'attività relativa alle chiamate di sistema che nel nostro caso sono le chiamate per l'accesso al file system. Il verde indica invece l'attività svolta dal resto del codice.

Per preparare le versioni concorrenti ci conviene scrivere una versione non ricorsiva. Per questo possiamo usare una classe locale:

/** Implementazione non ricorsiva di {@link mp.file.Utils#totalSize(Path)}.
 * @param d  percorso di una directory
 * @return il numero totale di byte contenuti nella directory */
 public static long totalSizeNR(Path d) {
     class Tot {
         long total(Path d) {  // Ritorna numero totale di byte della dir d
             long size = 0;
             try (Stream<Path> list = Files.list(d)) {
                 for (Path e : list.toArray(Path[]::new)) {
                     if (Files.isDirectory(e, NOL)) {
                         size += new Tot().total(e);  // No chiamata ricorsiva
                     } else if (Files.isRegularFile(e, NOL))
                         size += Files.size(e);
                 }
             } catch (IOException ex) { }
             return size;
         }
     }
     return new Tot().total(d);
 }

Le prestazioni sono molto simili a quelle delle versione ricorsiva.

Implementazioni concorrenti

Consideriamo ora possibili implementazioni concorrenti. Iniziamo con quelle più semplici che usano in modo diretto alcuni degli esecutori forniti dalla classe Executors.

Semplici esecutori

La prima implementazione parte dalla versione totalSizeNR e sostituisce l'invocazione diretta new Tot().total(e) con la sottomissione di un task, che esegue lo stesso calcolo, ma che sarà eseguito in un thread gestito da un esecutore. I Future<Long> ritornati dalle sottomissioni dei task sono poi richiesti completare invocando i loro metodi get. Così ogni task esegue essenzialmente ciò che era eseguito in una invocazione ricorsiva solo che quest'ultime sono sostituite dall'esecuzione, possibilmente in altri thread, di sub-task. Per adesso usiamo l'esecutore FixedThreadPool con molti thread, 1000, fra poco vedremo perché.

/** Implementazione concorrente di {@link mp.file.Utils#totalSize(Path)}. Esegue
 * un task per ogni directory. Il task per una directory sottomette i task per
 * le sub-directory, aspetta che completano e ritorna la somma dei byte di tutti
 * i file regolari contenuti nella directory.
 * @param d  percorso di una directory
 * @return il numero totale di byte contenuti nella directory */
public static long totalSizeNaiveConcur(Path d) {
    ExecutorService exec = Executors.newFixedThreadPool(1000);
    class Tot {
        long total(Path d) {
            long size = 0;
            try (Stream<Path> list = Files.list(d)) {
                List<Future<Long>> tasks = new ArrayList<>();
                for (Path e : list.toArray(Path[]::new)) {
                    if (Files.isDirectory(e, NOL)) {
                        tasks.add(exec.submit(() -> new Tot().total(e)));
                    } else if (Files.isRegularFile(e, NOL))
                        size += Files.size(e);
                }
                for (Future<Long> t : tasks)
                    size += t.get(50, TimeUnit.SECONDS);
            } catch (IOException ex) {
            } catch (InterruptedException | ExecutionException |
                    TimeoutException e) { throw new RuntimeException(e); }
            return size;
        }
    }
    try {
        return new Tot().total(d);
    } finally { exec.shutdown(); }
}

Abbiamo posto un timeout per i metodi get perché altrimenti, come vedremo presto, potremmo andare incontro ad un attesa infinita. Inoltre, abbiamo catturato le eccezioni controllate e l'abbiamo rilanciate come eccezioni non controllate, perché una lambda non può dichiarare di lanciare un'eccezione controllata. Provando il test con

test_ts("totalSizeNaiveConcur FixedThreadPool 1000", 
        mp.file.Utils::totalSizeNaiveConcur, dir, 10);

otteniamo

totalSizeNaiveConcur FixedThreadPool 1000  Directory: /usr
java.lang.RuntimeException: java.util.concurrent.ExecutionException:  
. . . java.util.concurrent.TimeoutException
Size: 0B  Time (seconds): min = -0.00 max = 0.00 ave = 0.00
Picco numero thread addizionali: 1000
Picco memoria addizionale: 128MB 217KB 24B

La figura sottostante mostra chiaramente ciò che è accaduto durante il test

Ognuno dei 1000 thread è rimasto bloccato da un task che era in attesa che un suo sub-task completasse. La figura mostra infatti che dopo un'iniziale breve attività c'è totale assenza di attività dovuta proprio alle attese. In altre parole, deve essere accaduto che i task di 1000 directory sono andati in esecuzione sui 1000 thread e ognuno di questi aveva almeno una sub-directory che ha indotto la sottomissione e l'attesa per il completamento di un sub-task. Ma i sub-task non potevano essere eseguiti perché tutti i thread disponibili erano già occupati ad eseguire i 1000 task. Quindi siccome abbiamo usato un esecutore di tipo FixedThreadPool che non può creare nuovi thread oltre il limite dato, le attese sarebbero durate all'infinito se non avessimo messo un timeout. Il blocco che si è venuto a creare è un esempio di stallo (deadlock) cioè una situazione in cui due o più processi si bloccano a vicenda, ogni processo per continuare deve aspettare che uno degli altri termini o faccia qualcosa che gli permetta di andare avanti. Nel nostro caso ogni thread era in attesa che un sub-task fosse eseguito da qualche thread, ma tutti i thread erano bloccati nello stesso modo e quindi nessuno di loro poteva eseguire i sub-task. Il termine più preciso per una situazione di stallo come questa è thread starvation deadlock.

Proviamo ad usare altri tipi di esecutori. Iniziamo con un CachedThreadPool che non pone nessun limite sul numero di thread che può creare, anzi ogni volta che ha un task da eseguire e tutti i thread sono impegnati, ne crea uno nuovo per eseguire il task. Sostituiamo la linea che imposta l'esecutore con

ExecutorService exec = Executors.newCachedThreadPool();

Provandolo otteniamo

totalSizeNaiveConcur CachedThreadPool  Directory: /usr
Size: 7GB 495MB 645KB 536B  Time (seconds): min = 1.02 max = 2.08 ave = 1.22
Picco numero thread addizionali: 1174
Picco memoria addizionale: 147MB 972KB 808B

Ora riesce a terminare ma come si vede usa ben 1174 thread. La figura qui sotto mostra perché il tempo d'esecuzione è decisamente migliore di quello della versione sequenziale.

Infatti gli 8 core sono tutti molto impegnati e il tempo medio è di 1.22 secondi contro 4.29, circa 3.5 volte più veloce.

Proviamo anche con l'esecutore WorkStealingPool. Questo esecutore a differenza del CachedThreadPool cerca di limitare il numero di thread usati, usando una coda per ogni thread in cui sono accodati i sub-task del task eseguito nel thread. Come il suo nome suggerisce, la caratteristica principale di questo esecutore è che quando un thread rimane senza task da eseguire "ruba" task che erano stati posti nella coda di esecuzione di un altro thread, se ve ne sono. Sostituiamo quindi la linea che imposta l'esecutore con

ExecutorService exec = Executors.newWorkStealingPool();

Provandolo otteniamo

totalSizeNaiveConcur WorkStealingPool  Directory: /usr
Size: 7GB 495MB 645KB 536B  Time (seconds): min = 2.33 max = 10.47 ave = 5.62
Picco numero thread addizionali: 613
Picco memoria addizionale: 262MB 658KB 920B

Rispetto al CachedThreadPool ha usato un numero minore di thread ma i tempi di esecuzione sono persino peggiori di quelli della versione sequenziale. La figura sottostante mostra l'attività dei core durante il test

Si nota immediatamente che nonostante tutti i core siano stati impegnati, la loro attività è dedicata maggiormente al coordinamento dei task e sub-task (le tacche verdi) mentre il lavoro più importante, cioè quello relativo al file system, rappresenta una frazione minoritaria.

I tre esecutori che abbiamo provato con questa prima versione concorrente non sono riusciti a coordinare ed eseguire in modo soddisfacente i task e i loro relativi sub-task. Il problema principale che ha messo in difficoltà gli esecutori è che i task non sono indipendenti. Infatti, un task relativo ad una directory sottomette i sub-task relativi alle sub-directory e poi aspetta il completamento di ognuno di questi. L'esecutore che si è comportato meglio è il CachedThreadPool ma ha usato un gran numero di thread e molta memoria addizionale. Ma ciò che è peggio è che il numero di thread che è costretto ad usare può crescere senza limiti in dipendenza del numero di sub-directory contenute a qualsiasi livello nella directory di input. Anche se i tempi di esecuzione sono, per la directory del test, abbastanza buoni l'uso di così tanti thread è insoddisfacente. Anche perché per alcune piattaforme (come Mac OS) c'è un limite di sistema sul numero massimo di thread che un processo può mantenere attivi simultaneamente.

Ridurre le dipendenze

La presenza di dipendenze tra i task rappresentano sempre un ostacolo per l'esecuzione concorrente. Se possibile conviene cercare di ridurre o meglio ancora eliminare le dipendenze. Nel nostro caso dovremmo definire i task in modo tale che non debbano aspettare il completamento dei loro sub-task. Così ogni task relativo ad una directory può terminare indipendentemente dagli altri task o sub-task. Un task quindi deve sottomettere i sub-task relativi alle sub-directory, ma non attendere il loro completamento, e ritornare solamente la somma dei byte dei file contenuti direttamente nella propria directory. Nel thread di invocazione sarà eseguito il task principale che aspetterà il completamento dei vari task relativi alle directory e sommerà i rispettivi risultati. Occorre però qualcosa che permetta al task principale di sapere quando il calcolo è terminato, cioè, quando tutti i task che sono stati sottomessi sono stati anche completati. Possiamo usare una variabile che mantiene il conteggio dei task pendenti, quelli che sono stati sottomessi ma che ancora non sono completati. Siccome questa variabile è aggiornata da più thread deve essere sincronizzata. Possiamo allora usare una variabile atomica. Potremmo usare un AtomicLong ma per i casi in cui ci possono essere molti accessi concorrenti può risultare più efficiente un LongAdder che al costo di usare un po' più di memoria risolve più efficientemente il contenzioso tra thread (thread contention3). Inoltre, siccome non importa l'ordine con il quale i risultati dei task sono sommati, possiamo usare un ExecutorCompletionService che è un esecutore che permette di ottenere i risultati di un gruppo di task in esecuzione nell'ordine in cui si completano. Infatti, il metodo Future<V> take() throws InterruptedException ritorna e rimuove il prossimo task che è completato, in caso aspetta il primo task che si completa. Un ExecutorCompletionService è costruito su un altro esecutore che gestisce i thread e come i task sono eseguiti dai thread. Volendo mantenere un numero basso di thread, possiamo usare un semplice FixedThreadPool con un numero di thread pari al numero di processori disponibili.

/** Implementazione concorrente di {@link mp.file.Utils#totalSize(Path)}. Esegue
 * un task per ogni directory. Il task per una directory sottomette i task per
 * le sub-directory e ritorna la somma dei byte dei file regolari direttamente
 * contenuti nella directory. Per eseguire i task usa un
 * {@link java.util.concurrent.CompletionService}. Per mantenere il conto dei
 * task ancora pendenti (cioè sottomessi ma non ancora completati) usa un
 * {@link java.util.concurrent.atomic.LongAdder}.
 * @param d  percorso di una directory
 * @return il numero totale di byte contenuti nella directory */
public static long totalSizeConcur(Path d)  {
    int np = Runtime.getRuntime().availableProcessors();
    ExecutorService pool = Executors.newFixedThreadPool(np);
    CompletionService<Long> exec = new ExecutorCompletionService<>(pool);
    LongAdder pending = new LongAdder();  // Tiene il conto dei task pendenti
    class Tot {
        long total(Path d) {  // Sottomette i task delle sub-directory e
            long size = 0;    // ritorna la somma dei byte dei file regolari
            try (Stream<Path> list = Files.list(d)) {
                for (Path e : list.toArray(Path[]::new)) {
                    if (Files.isDirectory(e, NOL)) {
                        pending.increment();   // Sottomette il sub-task per la
                        exec.submit(() -> new Tot().total(e)); // sub-directory
                    } else if (Files.isRegularFile(e, NOL))
                        size += Files.size(e);
                }
            } catch (IOException ex) { }
            return size;
        }
    }
    long total = 0;
    try {
        pending.increment();
        exec.submit(() -> new Tot().total(d));
        while (pending.sum() > 0) {       // Finché ci sono task pendenti,
            total += exec.take().get();   // chiedi il risultato di un task
            pending.decrement();          // e decrementa i task pendenti
        }
    } catch (InterruptedException | ExecutionException exc) {
    } finally { pool.shutdown(); }
    return total;
}

Il seguente diagramma mostra una possibile istantanea durante l'esecuzione di totalSizeConcur.

In blu sono indicati i task completati mentre in rosso sono quelli sottomessi ma non ancora completati. Come è mostrato può accadere che un sub-task può completarsi prima del task che lo ha sottomesso, proprio grazie all'indipendenza dei task. Testando il metodo sulla solita directory, otteniamo

totalSizeConcur  Directory: /usr
Size: 7GB 495MB 645KB 536B  Time (seconds): min = 0.79 max = 2.16 ave = 1.47
Picco numero thread addizionali: 10
Picco memoria addizionale: 71MB 809KB 832B

I tempi d'esecuzione sono paragonabili a quelli ottenuti dalla versione precedente che usava un CachedThreadPool. Però quest'ultimo ha usato più di 1000 thread mentre l'attuale versione ne usa solamente 8 (il fatto che il picco ne misura 10 potrebbe essere dovuto a qualche daemon thread creato dalla JVM). Inoltre anche il consumo di memoria addizionale è di molto inferiore e vicino a quello della versione sequenziale. La figura sottostante mostra l'attività dei core durante il test

Se confrontata con quella che usa il CachedThreadPool, si può notare che i core lavorano quasi a pieno regime in entrambe ma nella nuova versione le attività sono più ordinate ed omogenee. In accordo con il fatto che il coordinamento dei task e sub-task non è ostacolato da dipendenze.

Ridurre la sincronizzazione

Per favorire l'efficienza dell'esecuzione concorrente di più task, conviene ridurre la sincronizzazione o meglio ancora eliminarla, se possibile. La sincronizzazione è comunque un ostacolo per l'esecuzione parallela perché aumenta la thread contention e quindi la possibilità di tempi morti d'attesa. Per il nostro problema potremmo cercare di evitare di usare la variabile (sincronizzata) per il conteggio dei task pendenti. Invece di far sottomettere i sub-task direttamente dai task delle directory, potremmo modificare i task in modo tale che ritornino sia la somma dei byte dei file che la lista delle sub-directory. Così sarà solamente il task principale a sottomettere i task e sub-task e non ci sarà più bisogno di una variabile sincronizzata per il conteggio dei task pendenti.

/** Implementazione alternativa di {@link mp.file.Utils#totalSizeConcur(Path)}
 * che non usa variabili condivise mutabili. Un task per ogni directory che
 * ritorna la lista delle sub-directory e la somma dei bytes dei file regolari
 * direttamente contenuti nella directory. Tutti i task sono sottomessi nel
 * thread d'invocazione del metodo.
 * @param p  percorso di una directory
 * @return il numero totale di bytes contenuti nella directory */
public static long totalSizeConcur2(Path p)  {
    class Content {               // Il contenuto di una directory
        Content(long s, List<Path> list) {
            size = s;
            dirs = list;
        }
        final long size;          // Somma dei bytes dei file
        final List<Path> dirs;    // Lista delle sub-directory
    }
    Function<Path,Content> getCont = d -> { // Ritorna il contenuto della dir d
        long size = 0;                      // e non sottomette sub-task
        List<Path> dirs = new ArrayList<>();
        try (Stream<Path> list = Files.list(d)) {
            for (Path e : list.toArray(Path[]::new)) {
                if (Files.isDirectory(e, NOL)) {
                    dirs.add(e);
                } else if (Files.isRegularFile(e, NOL))
                    size += Files.size(e);
            }
        } catch (IOException ex) { }
        return new Content(size, dirs);
    };
    int np = Runtime.getRuntime().availableProcessors();
    ExecutorService pool = Executors.newFixedThreadPool(np);
    CompletionService<Content> exec = new ExecutorCompletionService<>(pool);
    long total = 0, pending = 0;
    try {
        pending++;
        exec.submit(() -> getCont.apply(p));
        while (pending > 0) {
            Content c = exec.take().get();  // Il prossimo task completato
            total += c.size;
            pending += c.dirs.size() - 1;
            for (Path d : c.dirs)           // Sottomette i sub-task del task
                exec.submit(() -> getCont.apply(d));
        }
    } catch (InterruptedException | ExecutionException e) {
    } finally { pool.shutdown(); }
    return total;
}

Provandolo otteniamo

totalSizeConcur2  Directory: /usr
Size: 7GB 495MB 645KB 536B  Time (seconds): min = 0.93 max = 2.36 ave = 1.28
Picco numero thread addizionali: 9
Picco memoria addizionale: 72MB 628KB 552B

Si può notare che tempi di esecuzione sono leggermente migliorati rispetto alla versione precedente. Questa è un'indicazione che probabilmente i contenziosi tra i thread per l'aggiornamento della variabile sincronizzata pending hanno un costo. Tuttavia c'è anche da considerare che il codice di quest'ultima versione è decisamente più lungo e complesso.

Produttori e consumatori

Uno dei più tipici design pattern per l'esecuzione concorrente è quello chiamato produttore-consumatore (producer-consumer). Permette di separare l'identificazione del lavoro dalla sua esecuzione inserendo i lavori da svolgere in una coda perché siano successivamente prelevati dalla coda ed eseguiti. Il design pattern produttore-consumatore permette di semplificare l'implementazione concorrente eliminando possibili dipendenze tra i produttori di lavoro e i consumatori (cioè i consumatori dei risultati dei lavori eseguiti) e può migliorare anche la gestione dei carichi di lavoro permettendo una distribuzione più equilibrata. Può essere usato in una gran varietà di contesti. In un web browser, durante la lettura di una pagina, un produttore può demandare i task di scaricamento delle immagini (o altre risorse) della pagina ad altrettanti consumatori che le restituiscono pronte per essere visualizzate. Un'applicazione per la ricerca in file che soddisfano certi criteri, può usare questo design pattern per distribuire a vari consumatori i compiti di eseguire le ricerche nei file selezionati. Molti esecutori come FixedThreadPool usano internamente il design pattern produttore-consumatore dove i task sottomessi (da un produttore che usa l'esecutore) sono inseriti in una coda da cui saranno prelevati ed eseguiti in uno dei thread attivi.

Per il nostro problema possiamo usare una coda in cui ogni task, relativo ad una directory, inserisce nella coda le sub-directory direttamente contenute nella directory e aggiorna il conteggio dei byte sommando i byte dei file direttamente contenuti nella directory. Le sub-directory sono prelevate dalla coda dal task principale che per ognuna di esse crea un task che sottomette per l'esecuzione. Il task principale deve sapere quando tutti i task sono completati. Possiamo fare in modo che immediatamente prima di terminare un task aggiunga alla coda un elemento che serve a marcare la fine del task. Purtroppo non possiamo usare il null per marcare la fine di un task perché quasi tutte le code con accettano il valore null. Allora come elementi della coda usiamo valori del tipo Optional<Path>, così un elemento della coda può rappresentare un Path o marcare la fine di un task tramite un Optional vuoto (cioè il valore ritornato da Optional.empty()). Chiaramente la coda dovendo essere usata da più thread deve essere sincronizzata. Il package java.util.concurrent offre molti tipi diversi di code sincronizzate. Nel nostro caso possiamo usare la coda illimitata ConcurrentLinkedQueue<E>.

/** Implementazione concorrente di {@link mp.file.Utils#totalSize(Path)}. Usa
 * una coda sincronizzata {@link java.util.concurrent.ConcurrentLinkedQueue} e
 * un contatore atomico {@link java.util.concurrent.atomic.LongAdder} per la
 * somma dei byte dei file. Il task per una directory aggiunge alla coda le
 * sub-directory e aggiorna il contatore atomico con i byte dei file regolari
 * direttamente contenuti nella directory. Per eseguire i task usa un
 * {@link java.util.concurrent.Executors#newFixedThreadPool(int)}.
 * @param p  percorso di una directory
 * @return il numero totale di bytes contenuti nella directory */
public static long totalSizeQueue(Path p) {
    ConcurrentLinkedQueue<Optional<Path>> queue = new ConcurrentLinkedQueue<>();
    LongAdder size = new LongAdder();  // Per sommare i byte dei file
    Consumer<Path> task = d -> {       // Somma i byte dei file a size e accoda
        try (Stream<Path> list = Files.list(d)) {  // le sub-dir
            for (Path e : list.toArray(Path[]::new)) {
                if (Files.isDirectory(e, NOL)) {
                    queue.add(Optional.of(e));   // Accoda la sub-directory
                } else if (Files.isRegularFile(e, NOL))
                    size.add(Files.size(e));     // Aggiorna size
            }
        } catch (IOException ex) { }
        queue.add(Optional.empty());     // Per marcare la fine del task
    };
    int np = Runtime.getRuntime().availableProcessors();
    ExecutorService exec = Executors.newFixedThreadPool(np);
    long pending = 0;
    try {
        pending++;
        exec.submit(() -> task.accept(p));
        while (pending > 0) {              // Finché ci sono task pendenti
            Optional<Path> op = queue.poll();  // Il primo elemento in coda
            if (op != null) {            // Se c'è,
                if (op.isPresent()) {    // ed è relativo a una sub-directory
                    pending++;           // sottomette il relativo task
                    exec.submit(() -> task.accept(op.get()));
                } else                   // se invece marca il completamento
                    pending--;           // di un task, decrementa il contatore
            }
        }
    } finally { exec.shutdown(); }
    return size.sum();
}

Provandolo, otteniamo

totalSizeQueue  Directory: /usr
Size: 7GB 495MB 645KB 536B  Time (seconds): min = 1.41 max = 2.11 ave = 1.61
Picco numero thread addizionali: 9
Picco memoria addizionale: 72MB 102KB 744B

I tempi sono leggermente peggiorati. Comunque l'attività dei core sembra essere ben bilanciata.

Fork-Join

Java offre a partire dalla versione 7 un framework basato sull'esecutore ForkJoinPool che può eseguire ForkJoinTask<V>. Il framework è appositamente studiato per l'esecuzione di un grandissimo numero di task indipendenti o con poche dipendenze e che non invocano metodi sincronizzati o comunque bloccanti. Generalmente, un'esecuzione inizia con un ForkJoinTask che è esplicitamente sottomesso a un ForkJoinPool o implicitamente sottomesso al pool di default ForkJoinPool.commonPool() tramite uno dei metodi fork(), invoke() o affini. Poi il task iniziale, tipicamente, produce altri sub-task che sono sottomessi per l'esecuzione sempre tramite i metodi fork, invoke, join e affini. A loro volta i sub-task operano in modo simile, come se fossero delle chiamate ricorsive.

Il metodo fork è simile al submit ma non è un metodo dell'esecutore ForkJoinPool ma di un oggetto ForkJoinTask che rappresenta appunto un task. L'effetto quindi è che il task sul quale è invocato il fork è sottomesso per l'esecuzione, ma in quale esecutore? Se il thread in cui avviene l'invocazione del fork fa parte di un ForkJoinPool allora il task è sottomesso in tale pool, altrimenti è sottomesso nel ForkJoinPool di default. Il metodo join è simile al get di un Future (un ForkJoinTask è un Future) ma non lancia eccezioni controllate come InterruptedException, quindi se invocato ritorna il risultato e se non è ancora completato attende fino a che si completa. Il metodo invoke è semanticamente equivalente a un fork seguito da un join però tende ad eseguire il task nel thread di invocazione. Il metodo statico <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) di ForkJoinTask esegue il fork per ogni task e attende che i task siano tutti completati.

La caratteristica principale della gestione dell'esecuzione dei task da parte di un ForkJoinPool, sia esso esplicitamente creato o quello di default, è la seguente. Ogni thread ha un doppia coda (una deque) di task assegnategli, preleva i task da eseguire dalla testa della coda e aggiunge nuovi task da eseguire (ad esempio sub-task sottomessi dal task attualmente in esecuzione) sempre in testa alla coda. Inoltre, quando un thread non ha più task da eseguire sottrae, se c'è, un task dalla fine della coda di qualche altro thread. Quindi usa una gestione di tipo work stealing similmente al WorkStealingPool. Invero il WorkStealingPool usa internamente proprio un ForkJoinPool ma con un'importante differenza, i nuovi sub-task non sono aggiunti in testa alla coda ma sono accodati. Come vedremo tra poco, almeno per il nostro problema, questo può fare una grande differenza. Un'altra caratteristica che distingue un ForkJoinPool da altri esecutori è che usa daemon thread e quindi non è necessario effettuare lo shutdown.

La classe ForkJoinTask ha un metodo statico <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) che ritorna un ForkJoinTask che esegue il Callable. Usando il ForkJoinPool di default non dobbiamo creare esplicitamente l'esecutore e così possiamo scrivere una versione che usa il framework Fork-Join modificando direttamente la prima versione, quella ricorsiva, di totalSize:

/** Implementazione tramite Fork-Join di {@link mp.file.Utils#totalSize(Path)}.
 * @param d  percorso di una directory
 * @return il numero totale di bytes contenuti nella directory */
public static long totalSizeForkJoin(Path d)  {
    long size = 0;
    List<ForkJoinTask<Long>> tasks = new ArrayList<>();
    try (Stream<Path> list = Files.list(d)) {
        for (Path e : list.toArray(Path[]::new)) {
            if (Files.isDirectory(e, NOL)) {
                tasks.add(ForkJoinTask.adapt(() -> totalSizeForkJoin(e)));
            } else if (Files.isRegularFile(e, NOL))
                size += Files.size(e);
        }
    } catch (IOException ex) { }
    for (ForkJoinTask<Long> t : ForkJoinTask.invokeAll(tasks))
        size += t.join();
    return size;
}

Provandolo, otteniamo

totalSizeForkJoin  Directory: /usr
Size: 7GB 495MB 645KB 536B  Time (seconds): min = 0.89 max = 1.31 ave = 0.99
Picco numero thread addizionali: 7
Picco memoria addizionale: 65MB 371KB 136B

Apparentemente questa è la versione più veloce, che usa il minor numero di thread e con memoria addizionale molto bassa. Non solo, il codice tra tutte le versioni concorrenti è il più breve e semplice. Da notare anche la grande differenza con il risultato della versione che usa il WorkStealingPool. Il fatto che i nuovi sub-task siano aggiunti in testa alle code di esecuzioni piuttosto che alla fine, per il nostro problema, è cruciale. Infatti l'esecutore tenderà a completare prima i task relativi alle directory più in profondità e poi via via quelle più vicine alla directory radice. Procederà, approssimativamente, dalle foglie dell'albero verso la radice. In questo modo si minimizzeranno i tempi d'attesa che ogni task dovrà sopportare per i sub-task relativi alle sub-directory. Ovviamente ciò è possibile perché in molti casi i sub-task sono eseguiti direttamente nello stesso thread del task che li ha sottomessi. Per questo un ForkJoinTask non è solamente un Future ma per certi versi è simile a un thread. Si tratta di un thread molto più leggero di un vero thread ma capace comunque di eseguire, al pari di un vero thread, più di un task.

Il framework Fork-Join è studiato per funzionare al meglio proprio per l'esecuzione di moltissimi task che sono creati seguendo una struttura ad albero (o comunque aciclica) in cui ogni task sottomette (ricorsivamente) altri sub-task e aspetta che questi completino. Non a caso il modo tipico di creare un ForkJoinTask, che è una classe astratta, è tramite una delle sue sotto-classi RecursiveAction e RecursiveTask<V>, si notino i nomi che iniziano con Recursive. Anch'esse sono astratte ma sono più semplici da implementare, la prima per task che non ritornano alcun valore e la seconda per task che ritornano un valore.

Abbiamo esplorato diversi strumenti e modi per suddividere una computazione in molti task e per coordinarne l'esecuzione. Alcuni di questi hanno un'applicabilità molto generale come il design pattern produttore-consumatore. Altri hanno una minore ampiezza di applicabilità, come il framework Fork-Join, ma il loro uso per alcuni problemi particolari può risultare estremamente efficace. L'efficacia di uno specifico modo per la suddivisione e il coordinamento dei task può non essere facile da determinare a priori. Spesso è necessaria un'adeguata sperimentazione i cui risultati possono però dipendere dalla particolare piattaforma hardware/software che si sta usando. In ogni caso la conoscenza del funzionamento dei vari strumenti messi a disposizione dalla libreria è utile non solo come guida per effettuare sperimentazioni ben mirate ma anche per spiegare i risultati delle sperimentazioni stesse. E alla fine per poter scegliere gli strumenti più adatti e usarli nel modo migliore.

Esercizi

[NaiveConcur2]    Scrivere una versione modificata di totalSizeNaiveConcur in cui i task delle sub-directory invece di essere immediatamente sottomessi per l'esecuzione, sono raccolti come Callable<Long> in una lista e alla fine sono sottomessi per l'esecuzione tutti insieme tramite il metodo List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit). Mettere alla prova la nuova versione confrontandola con quella originale.

[Queue2]    Scrivere una versione modificata di totalSizeQueue in modo tale che la coda abbia come elementi liste di path, cioè List<Path>. Così non c'è bisogno di marcare esplicitamente la fine di un task perché ogni task aggiunge alla coda solamente un elemento, cioè la lista dei percorsi delle sub-directory. Mettere alla prova la nuova versione confrontandola con quella originale.

[TotalSizeErrors]    Scrivere un metodo long totalSize(Path d, Collection<IOException> errs) che come totalSize ritorna la somma dei byte dei file contenuti nella directory d e in più se errs non è null, aggiunge a errs le eventuali eccezioni di I/O incontrate. Scrivere anche una versione concorrente e confrontarla con quella sequenziale.

[DirStat]    Scrivere un metodo String dirStat(Path d) che ritorna una stringa le seguenti informazioni statistiche circa la directory d: numero totale di file, numero totale di directory, massima profondità, massimo numero di file direttamente contenuti in una directory, massimo numero di sub-directory contenute in una directory, massimo numero di elementi (file/dir) contenuti in una directory e numero totale di errori di I/O. Scrivere anche varie implementazioni concorrenti e confrontarle.

[Find]    Scrivere un metodo List<Path> find(Path d, BiPredicate<Path, BasicFileAttributes> matcher) che ritorna la lista dei file/dir contenuti, a qualsiasi livello, nella directory d e che soddisfano il matcher. Scrivere anche una o più versioni concorrenti e confrontarle.

[StringSearch]    Scrivere un metodo Map<String,Set<Path>> search(Path d, Collection<String> ss) che ritorna una mappa che ad ogni stringa in ss associa l'insieme dei percorsi dei file nella directory d che contengono la stringa (eventualmente la ricerca può limitarsi ai file i cui nomi hanno certe estensioni come .txt, .html, ecc.). Scrivere una o più versioni concorrenti. Nel caso la mappa sia aggiornata da più thread, provare ad usare ConcurrentHashMap<K,V> o una versione sincronizzata come quella ritornata dal metodo <K,V> Map<K,V> synchronizedMap(Map<K,V> m) di Collections.

[ParallelSum]    Scrivere un metodo double parallelSum(double[] a) che ritorna la somma dei valori dell'array a la cui implementazione cerca di sfruttare al meglio il parallelismo. Si provi ad usare il framework Fork-Join e fare un confronto con un'implementazione sequenziale.

[ParallelOp]    Generalizzare l'esercizio precedente definendo un metodo <V> V parallelOp(V[] a, BinaryOperator<V> op) che ritorna il risultato dell'applicazione dell'operazione binaria op ai valori dell'array a. Si assume che l'operazione op sia associativa (ma non necessariamente commutativa) e che l'array abbia lunghezza almeno 2. Provarlo usando come tipo V un tipo numerico (ad es. Double o Integer) e operazioni come somma o max. Confrontarlo con un'implementazione sequenziale.

4 Mag 2015


  1. Il termine code rafactoring indica ristrutturazioni del codice come rinominare classi, metodi, o spostare metodi o classi da un package o da una classe ad un'altra e modifiche simili. I moderni IDE offrono meccanismi automatici per effettuare refactoring. Ad esempio l'IDE IntelliJ mette a disposizione nel menu Refactor le operazioni più comuni di refactoring.

  2. Gli errori più comuni che possono normalmente accadere riguardano file o directory che non possono essere visitate provocando il lancio di eccezioni del tipo AccessDeniedException.

  3. Con thread contention si intende una situazione in cui uno o più thread sono in attesa che un lock sia rilasciato da un altro thread. È chiamato così perché è come se ci fosse una contesa per conquistare un lock. In generale, le situazioni di thread contention dovrebbero essere ridotte al minimo perché ovviamente fanno perdere tempo.