Presentation
This page presents some code snippets related to the use of Multithreading API.
Some of the snippets presented below can be used online.
Additional snippets are available in directory: gatb-core/gatb-core/examples/tools.
Iteration in a multithreaded fashion
This snippet shows how to iterate some Iterator object (here a range of integers) with N threads in order to speed up the iteration.
This snippet introduces the Dispatcher class and shows how to simply use it for parallelizing one iteration.
Note: this approach can work only if the items can be iterated and processed independently from each other.
Code is from example multithreading1.cpp:
struct Functor { void operator() (int i)
{
}};
int main (int argc, char* argv[])
{
size_t nbCores = (argc >=2 ? atoi(argv[1]) : 0);
Range<int>::Iterator it (1,1000);
Dispatcher dispatcher (nbCores);
IDispatcher::Status status = dispatcher.iterate (it, Functor(), 1);
cout << "nbCores=" << status.nbCores << " time=" << status.time << endl;
}
[go back to top]
Multithreaded iteration and shared resources
This snippet shows how to parallelize an iteration and how several threads can modify a common resource throughout the iteration.
The important point here is to understand that shared resources must be modified cautiously by different threads running at the same time.
Code is from example multithreading2.cpp:
int main (int argc, char* argv[])
{
size_t nbCores = (argc >=2 ? atoi(argv[1]) : 0);
int nmax = 10000;
Range<int>::Iterator it (1,nmax);
Dispatcher dispatcher (nbCores, 1);
int sum1=0, sum2=0;
dispatcher.iterate (it, [&] (int i) { sum1 += i; });
dispatcher.iterate (it, [&] (int i) { __sync_fetch_and_add (&sum2, i); });
cout << "First iteration: sum=" << sum1 << " (result should be " << nmax*(nmax+1)/2 << ")" << endl;
cout << "Second iteration: sum=" << sum2 << " (result should be " << nmax*(nmax+1)/2 << ")" << endl;
}
[go back to top]
Multithreaded iteration with synchronization of a shared resource
Here, our shared resource is a file, so we can't use intrinsic instruction like we did before for integer addition.
We need some general synchronization mechanism that will ensure that a portion of code can be executed only by one thread at one time.
Code is from example multithreading3.cpp:
#include <fstream>
struct Functor
{
ISynchronizer* synchro; fstream& file;
Functor (ISynchronizer* synchro, fstream& file) : synchro(synchro), file(file) {}
void operator() (int i)
{
synchro->lock ();
file << i << endl;
synchro->unlock ();
}
};
int main (int argc, char* argv[])
{
size_t nbCores = (argc >=2 ? atoi(argv[1]) : 0);
int nmax = 10000;
Range<int>::Iterator it (1,nmax);
fstream file ("out", std::fstream::out);
ISynchronizer* synchro = System::thread().newSynchronizer();
Dispatcher dispatcher (nbCores, 1);
dispatcher.iterate (it, Functor(synchro,file));
file.close();
delete synchro;
}
[go back to top]
Multithreaded iteration with synchronization of a shared resource (bis)
This snippet is similar to the previous one. It only shows how to use the LocalSynchronizer class to simply lock/unlock the containing instruction block.
This is useful for avoiding classical deadlock bugs when one forgets to unlock a synchronizer.
Code is from example multithreading4.cpp:
#include <fstream>
int main (int argc, char* argv[])
{
size_t nbCores = (argc >=2 ? atoi(argv[1]) : 0);
int nmax = 1000;
Range<int>::Iterator it (1,nmax);
fstream file ("out", std::fstream::out);
ISynchronizer* synchro = System::thread().newSynchronizer();
Dispatcher dispatcher (nbCores, 1);
dispatcher.iterate (it, [&] (int i)
{
LocalSynchronizer sync (synchro);
file << i << endl;
});
file.close();
delete synchro;
}
[go back to top]
Multithreaded iteration without shared resources management
This snippet introduces the ThreadObject class designed to avoid concurrent accesses issues.
Instead of working on a single shared resource, threads use local resources during the iteration and then, a final aggregation of the local resources is done after the iteration.
Such an approach skips the need of synchronization mechanisms when threads directly uses a single shared resource. This may be interesting since synchronization mechanisms may introduce time overheads.
Code is from example multithreading5.cpp:
int main (int argc, char* argv[])
{
size_t nbCores = (argc >=2 ? atoi(argv[1]) : 0);
int nmax = 1000;
Range<int>::Iterator it (1,nmax);
Dispatcher dispatcher (nbCores, 1);
ThreadObject<int> sum;
dispatcher.iterate (it, [&] (int i)
{
sum() += i;
});
sum.foreach ([&] (int localSum)
{
*sum += localSum;
});
cout << "sum=" << *sum << " (result should be " << nmax*(nmax+1)/2 << ")" << endl;
}
[go back to top]
Multithreaded iteration of a bank
This snippet shows how to iterate sequences of a bank and counts how many A,C,G,T it contains. The interesting part is to see that the Bank class can create Iterator instances that can be iterated through a Dispatcher instance.
Note: iterating a bank from a disk makes a lot of I/O, so parallelizing such an iteration may not lead to significant better performance. However, if the snippet is launched once, the bank (if not too big) may be in the RAM cache, so it is interesting to relaunch the snippet with varying number of cores and see how execution time evolves.
Code is from example multithreading6.cpp:
int main (int argc, char* argv[])
{
if (argc < 2)
{
cerr << "you must provide at least the FASTA file path." << endl;
return EXIT_FAILURE;
}
BankFasta bank (argv[1]);
size_t nbCores = (argc >=3 ? atoi(argv[2]) : 0);
Dispatcher dispatcher (nbCores);
ThreadObject<int> sumA, sumC, sumG, sumT, sumN;
dispatcher.iterate (bank.iterator(), [&] (const Sequence& seq)
{
int& localA = sumA();
int& localC = sumC();
int& localG = sumG();
int& localT = sumT();
int& localN = sumN();
for (size_t i=0; i<seq.getDataSize(); i++)
{
switch (seq.getDataBuffer()[i])
{
case 'A': localA++; break;
case 'C': localC++; break;
case 'G': localG++; break;
case 'T': localT++; break;
case 'N': localN++; break;
}
}
}, 1 );
sumA.foreach ([&] (int n) { *sumA += n; });
sumC.foreach ([&] (int n) { *sumC += n; });
sumG.foreach ([&] (int n) { *sumG += n; });
sumT.foreach ([&] (int n) { *sumT += n; });
sumN.foreach ([&] (int n) { *sumN += n; });
cout << "|A|=" << *sumA << endl;
cout << "|C|=" << *sumC << endl;
cout << "|G|=" << *sumG << endl;
cout << "|T|=" << *sumT << endl;
cout << "|N|=" << *sumN << endl;
}
[go back to top]