并行复习提纲
并行算法概述
- 并行计算模型:
- RAM模型:内存无限,指令顺序执行,一次执行一条,所有指令花费单位时间,算法的执行时间取决于执行的指令数量。
- PRAM模型:每一个处理器可以配有局部内存,拥有一个全局共享内存所有处理器都可以访问。
- PRAM操作类型:
- 同步:处理器执行时会加锁,每执行一步处理器工作或者待机。
- 异步:处理器有局部时钟,用于同步处理器。
- PRAM操作类型:
- LogP模型:受并行计算机设计的影响,一种分布式内存多处理器的模型。处理器通信通过点对点的消息通信实现。基于logP的广播算法。
- BSP模型:P配有局部内存的处理器。使用点对点完成消息传递,或者使用共享变量的方式完成消息传递。超步代价的计算方法需要考虑计算阶段和通信阶段是否产生堆叠。
- 设计并行算法的基本方法
- 分解:分解成效地任务,开拓并发性。
- 递归分解:适合可以使用分治法进行解决的问题。
- 数据分解:划分数据,把数据分配给不同的任务。
- 探索分解:面对任务之间依赖事先未知的情况。
- 猜测分解:保守方法,当确认没有依赖时,把其识别成独立任务。乐观方法,即使可能是错误的,仍然调度任务。保守方法可能产生较少的并发,乐观方法可能需要回滚。
- 通信:确定诸任务之间的数据交换,检测划分的合理性。
- 组合:根据任务的局部性,组成更大的任务。
- 映射:把每个任务分配到处理器上,提高并行性能。
- 静态映射:把任务事先映射到进程上。基于数据划分的映射。基于任务图的映射。混合映射。
- 动态映射:运行期间,把任务映射到进程。
- 分解:分解成效地任务,开拓并发性。
- 并行算法模型:算法模型主要涉及选择划分方法以及映射技术以减少人物之间的交互。
- data parallel model
- master slave model
- producer consumer model
- task graph model
- hybrid model
- 给出了并行桶排序的示例,每一个桶分配一个处理器。
- 使用的并行计算模型是PRAM模型,数据划分方法对中间数据进行划分。
- 映射方法:基于数据划分的映射。算法模型:data parallel model
并行程序设计基础
- 简介
- 并行程序设计模型
- 共享地址空间(SAS)程序设计模型:任何进程在共享空间中可以命名任何变量。有load和store两种。在进程内顺序执行程序,在线程与线程之间会存在交叉。通过排序事件保证顺序性。
- 消息传递(MP)程序设计模型:不存在共享地址空间,进程可以直接命名局部变量。通过发送消息完成线程之间的通信。通过send和receive函数实现了点对点的同步。可以构建全局地址空间,使用进程id号和进程地址空间内部地址实现。
- 全局地址分区(PGAS)程序设计模型:全局地址空间,每一个线程可以看到全部数据,不需要发生数据复制操作,方便线程之间共享数据。把全局地址空间进行分割。
- 并行程序设计模式。构建并行程序需要以下几步:分解,分配任务,进行资源协调,完成映射。
- pipe and filter:图像检索系统,几乎所有的软件上层都有该结构。
- iterator:售票/列车员和对每个人的信息售票/检票。
- mapreduce:map阶段把数据分配给各计算单元进行处理。reduce阶段把map阶段的结果进行合并。
- blackboard/agent and repository:blackboard存放供所有的agents共享的结果。manager协调agent访问blackboard,并产生合并结果。举例:编译优化,程序的中间表达式存放在repo中,agent通过启发式规则优化优化中间代码,manager控制优化的agent存取程序的中间代码。
- process control:
- model view controller
- layered
- event-based coordination
流水线计算
概述
流水线计算是通过把问题分解成一系列依次完成的任务,每一个任务由单一的进程或处理器处理。
流水线应用类型
- 如果问题可以分解成一系列顺序任务,流水线方法可以提高如下三种类型计算的速度:
- 同时运行多个解决完整问题的示例
- 一系列数据项需要处理,且每一数据项需要多个操作
- 启动下一个步骤的信息可以在当前步骤结束之前先行传递给下一步骤
- 如果问题可以分解成一系列顺序任务,流水线方法可以提高如下三种类型计算的速度:
流水线应用实例
- 插入排序:可以使用流水线的方式完成数据的插入。
- 求解线性方程组:通过前一个式子得到一个变量的表达式,代入到下一个式子中,以此类推,最终求出所有的变量的值。
共享内存编程
简介
- 常见的共享内存的并行设计方法:
- 采用重量级进程
- 采用线程
- 采用全新的用于并行程序设计的语言,但是接受度比较低如:ada
- 修改现有的顺序程序设计语言语法,形成支持并行程序设计的语言,如cilk
- 在已有的顺序程序设计语言补充指定并行的编译指示。例如:OpenMP
- 在已有的顺序程序设计语言里引入库函数
- 常见的共享内存的并行设计方法:
cilk
- cilk简介
- 面向fork-join方式任务产生的编程模型适合递归算法。
- 不采用循环,而是递归的产生很多任务。
- 创建任务嵌套队列,调度器采用work stealing确保所有的核都忙。
- cilk程序实例
- Inlet
- Abort
- Lock
- cilk简介
TBB
- TBB简介
- C++library
- 与其他的线程包兼容
- 可扩展的数据并行程序设计
- 指定模板和任务,不仅仅线程。TBB把任务分配到线程,负责负载平衡。
- C++ review
- C++函数模板
- parallel algorithms
- 循环的并行:parallel_for(range,body,partitioner)是一个在值域执行并行迭代操作的模板函数,比如对数组求和。 上述函数表示在区域内的每一个值都执行body函数,partitioner选项指定了分割策略。
- 索引类型必须是整型
- 循环不能回环
- 步长必须为正,默认为1
- 并没有保证迭代操作以并行的方式进行
- 较小的迭代等待更大的迭代可能会发生死锁
- 分割策略总是auto_partitioner
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
using namespace std;
using namespace tbb;
typedef vector<int>::iterator IntVecIt;
struct body
{
void operator()(const blocked_range<IntVecIt>&r)const
{
for(auto i = r.begin(); i!=r.end(); i++)
cout<<*i<<' ';
}
};
int main()
{
vector<int> vec;
for(int i=0; i<10; i++)
vec.push_back(i);
parallel_for(blocked_range< IntVecIt>(vec.begin(), vec.end())
, body());
return 0;
}
- parallel_reduce模板在一个区域内迭代,将由各个任务计算得到的部分结果合并,得到最终结果。parallel_reduce对区域range的要求和parallel_for一样。body类型需要分割构造函数以及一个join方法。body的分割构造函数拷贝运行循环体需要的只读数据,并分配归并操作中初始化归并变量的标志元素。join方法会组合归并操作中各任务的结果。 上述程序是找数组中最小值的索引号和值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
using namespace std;
using namespace tbb;
class MinIndexBody {
const float *const my_a;
public:
float value_of_min;
long index_of_min;
MinIndexBody ( const float a[] ) :
my_a(a), value_of_min(FLT_MAX), index_of_min(-1)
{ }
void operator()(const blocked_range<size_t>& r ){
const float* a = my_a;
for( size_t i = r.begin(); i != r.end(); ++i ){
float value = a[i];
if( value<value_of_min ) {
value_of_min = value;
index_of_min = i;
}
}
}
MinIndexBody(MinIndexBody& x, split ) :
my_a(x.my_a), value_of_min(FLT_MAX),
index_of_min(-1)
{}
void join(const MinIndexBody& y ) {
if( y.value_of_min < value_of_min ) {
value_of_min = y.value_of_min;
index_of_min = y.index_of_min;
}
}
};
// Find index of smallest element in a[0...n-1]
long ParallelMinIndex ( const float a[], size_t n ) {
MinIndexBody mib(a);
parallel_reduce(blocked_range<size_t>(0,n,GrainSize), mib );
return mib.index_of_min;
} - parallel_pipeline是流的并行算法。其定义为: 下面使用TBB完成读取一个文本文件,把每一个单词的首字母改写成大写,最后把修改后的文本写入到新文件。
1
2
3
4
5
6
7
8
9
10class pipeline {
public:
pipeline();
~pipeline();
void add_filter( filter& f );
void run( size_t max_number_of_live_tokens [, task_group_context& group] );
void clear();
};
void parallel_pipeline( size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain [, task_group_context& group] );1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71//main program
tbb::pipeline pipeline; // Create the pipeline
MyInputFilter input_filter( input_file ); // Create file-reading writing stage
pipeline.add_filter( input_filter ); //add it to the pipeline
MyTransformFilter transform_filter; // Create capitalization stage
pipeline.add_filter( transform_filter );// add it to the pipeline
MyOutputFilter output_filter( output_file ); // Create file-writing stage
pipeline.add_filter( output_filter ); // add it to the pipeline
pipeline.run( MyInputFilter::n_buffer ); // Run the pipeline
pipeline.clear(); // Must remove filters from pipeline before they are implicitly destroyed.
// Filter that writes each buffer to a file.
class MyOutputFilter: public tbb::filter {
FILE* my_output_file;
public:
MyOutputFilter( FILE* output_file );
void* operator()( void* item ); /*override*/
};
MyOutputFilter::MyOutputFilter( FILE* output_file ) : tbb::filter(serial),
my_output_file(output_file)
{ }
void* MyOutputFilter::operator()( void* item ) {
MyBuffer& b = *static_cast<MyBuffer*>(item);
fwrite( b.begin(), 1, b.size(), my_output_file );
return NULL;
}
// Changes the first letter of each word from lower case to upper case.
class MyTransformFilter: public tbb::filter {
public:
MyTransformFilter();
void* operator()( void* item ); /*override*/
};
MyTransformFilter::MyTransformFilter() : tbb::filter(parallel)
{}
/*override*/void* MyTransformFilter::operator()( void* item ) {
// a for loop and ‘toupper()’ go here…
}
//斐波那契数列
class FibTask: public task {
public:
const long n;
long* const sum;
FibTask( long n_, long* sum_ ) :
n(n_), sum(sum_)
{}
task* execute() { // Overrides virtual function task::execute
if( n<CutOff ) {
*sum = SerialFib(n);
} else {
long x, y;
FibTask& a = *new( allocate_child() ) FibTask(n-1,&x);
FibTask& b = *new( allocate_child() ) FibTask(n-2,&y);
set_ref_count(3); // 3 = 2 children + 1 for wait
spawn( b );
spawn_and_wait_for_all( a );
*sum = x+y;
}
return NULL;
}
};
- TBB task-scheduler
- 把任务映射到threads
- 处理负载平衡和调度
- 使用task scheduler必须初始化一个tbb::task_scheduler_init
- 创建一个新的根目录
- 分配任务对象
- 构造任务
- 负载平衡策略
- 任务窃取:不断地通过划分递归进行负载平衡。
- 支持并发的容器
- concurrent queue container
- concurrent vector container
- concurrent hash table
- concurrent queue container
- MUTEX
- Scalable:可扩展mutex通常比非扩展的mutex的性能差
- Fair:公平mutex采用先来先服务策略,避免了任务饿死。由于后面任务能跳过前面任务,不公平mutex通常性能要好
- Reentrant/recursive:reentrant mutex允许持有一mutex上锁的任务,可申请另一锁。这在递归场合有用,但它会让锁实现更加复杂
- SLEEP or SPIN MUTEX:Mutex可让等待任务spin in user space或sleep. 对于短时间等待,spinning in user space性能最好,因为让任务睡觉需要消耗时钟周期.长时间等待,sleep更好些,因为任务会让出处理器. 在单核支持多任务的处理器里,Spin性能较差,如Intel processors with hyperthreading technology.
- 其他
- TBB简介
OpenMP
- 简介
思考:使用TBB实现并行搜索目录及子目录,parallel_for和parallel_reduce只能处理没有依赖关系的任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37//首先写出串行版本的dfs
bool dfs(node* cur_node, string& name){
if(cur_node->name == name){
return true;
}
bool ret = false;
for(auto child : cur_node->children){
ret = ret | dfs(child);
}
return ret;
}
//考虑使用parallel_invoke
bool parallel_dfs(Node* cur_node, string& match_name){
if(cur_node->name == match_name){
return true;
}
atomic<bool> ret = false;
for(auto child : cur_node->children){
tbb::parallel_invoke([&]{ret |= parallel_dfs(child)});
}
return ret;
}
//考虑使用task_group
bool parallel_dfs(Node* cur_node, string& match_name){
if(cur_node->name == match_name){
return true;
}
atomic<bool> ret = false;
tbb:task_group g;
for(auto child : cur_node->children){
g.run([&]{ret |= parallel_dfs(child)});
}
g.wait();
return ret;
}
使用消息传递库进行编程(MPI)
- 简介
PPT讲的支离破碎
请直接参考链接
基本概念
- 第一个概念是通讯器(communicator)。通讯器定义了一组能够互相发消息的进程。在这组进程中,每个进程会被分配一个序号,称作秩(rank),进程间显性地通过指定秩来进行通信。
- 通信的基础建立在不同进程间发送和接收操作。一个进程可以通过指定另一个进程的秩以及一个独一无二的消息标签(tag)来发送消息给另一个进程。接受者可以发送一个接收特定标签标记的消息的请求(或者也可以完全不管标签,接收任何消息),然后依次处理接收到的数据。类似这样的涉及一个发送者以及一个接受者的通信被称作点对点(point-to-point)通信。
- 当然在很多情况下,某个进程可能需要跟所有其他进程通信。比如主进程想发一个广播给所有的从进程。在这种情况下,手动去写一个个进程点对点的信息传递就显得很笨拙。而且事实上这样会导致网络利用率低下。MPI 有专门的接口来帮我们处理这类所有进程间的集体性(collective)通信。
点对点通信
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16MPI_Send(
void* data,
int count,
MPI_Datatype datatype,
int destination,
int tag,
MPI_Comm communicator)
//data代表传输的数据,count代表最多传输的元素数量,datatype代表了元素类型,第四个和第五个决定了发送方/接收方的秩以及信息标签,第六个制定了使用的通信子
MPI_Recv(
void* data,
int count,
MPI_Datatype datatype,
int source,
int tag,
MPI_Comm communicator,
MPI_Status* status) //recv特有的status提供了接收到的信息的状态,可以知道实际接受了多少信息给出一个简单的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16// 得到当前进程的 rank 以及整个 communicator 的大小
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
int number;
if (world_rank == 0) {
number = -1;
MPI_Send(&number, 1, MPI_INT, 1, 0, MPI_COMM_WORLD);
} else if (world_rank == 1) {
MPI_Recv(&number, 1, MPI_INT, 0, 0, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
printf("Process 1 received number %d from process 0\n",
number);
}对于变长消息,MPI使用了额外的函数调用解决。MPI_Status在上面被MPI_STATUS_IGNORE忽略了,但是如果我们把MPI_Status结构体传递给MPI_Recv函数,则可以在操作完补充额外的信息。三个主要的信息包括:
- 发送端秩
- 消息的标签
- 消息的长度
完整的代码示例:阻塞通信和非阻塞通信对CPU资源的利用率不同,对应的消息函数是MPI_Isend和MPI_Irecv,给出了一个例子:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
int main(int argc, char** argv) {
// Initialize the MPI environment
MPI_Init(NULL, NULL);
//create a new communicator
MPI_Comm MyWorld;
MPI_Comm_dup(MPI_COMM_WORLD,&MyWorld);
//split processor
int my_rank,group_size,Color,Key;
MPI_Comm_rank(MyWorld,&my_rank);
MPI_Comm_size(MyWorld,&group_size);
Color = my_rank % 3;
Key = my_rank / 3;
MPI_Comm_split(MyWorld,Color,Key,&SplitWorld);
// Get the number of processes
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
// Get the rank of the process
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
// Get the name of the processor
char processor_name[MPI_MAX_PROCESSOR_NAME];
int name_len;
MPI_Get_processor_name(processor_name, &name_len);
// Print off a hello world message
printf("Hello world from processor %s, rank %d out of %d processors\n",
processor_name, world_rank, world_size);
// Finalize the MPI environment.
MPI_Finalize();
}1
2
3
4
5
6
7
8
9
10
11MPI_Comm_rank(MPI_COMM_WORLD, &myrank); /* find rank */
if (myrank == 0) {
int x;
MPI_Isend(&x,1,MPI_INT, 1, msgtag, MPI_COMM_WORLD, req1);
compute();
MPI_Wait(req1, status); //直到handle指示的发送或者接受操作完成后才会返回,这个函数会阻塞线程的执行
//MPI_Test(req1,flag,status) //这个会根据操作完成后给flag赋值为true,但是不会阻塞线程的执行
} else if (myrank == 1) {
int x;
MPI_Recv(&x,1,MPI_INT,0,msgtag, MPI_COMM_WORLD, status);
}
群通信/组通信
- 一对多
- MPI_BCAST
- MPI_SCATTER,MPI_SCATTERV
- 多对一
- MPI_GATHER,MPI_GATHERV
- MPI_REDUCE
- 多对多
- MPI_ALLGATHER,MPI_ALLGATHERV
- MPI_ALLTOALL,MPI_ALLTOALLV,MPI_ALLTOALLW
- MPI_ALLREDUCE,MPI_REDUCE_SCATTER
- 一对多
思考:用MPI实现:一组进程组成一环,起始时每进程将自己的rank赋给一变量buf,然后将buf值传递给右侧的邻居。进程将收到的值替换buf的值,并计算收到的值之和。程序终止条件可自定。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24int main(int argc, char *argv[])
{
// my begin
int buf;
int numtasks;
int max_proc_num = 3;
MPI_Init(&argc, &argv); /*Initializes MPI calls*/
MPI_Comm_rank(MPI_COMM_WORLD, &buf); /* obtains the rank of current MPI process */
MPI_Comm_size(MPI_COMM_WORLD, &numtasks); /* obtains the total number of MPI processes */
MPI_Status status;
MPI_Request reqs[2];
// my function
int m = rank, n = 100; //m:发送数据,n:接收数据
MPI_Isend(&buf, 1 , MPI_INT, (buf + 1) % max_proc_num, 0, MPI_COMM_WORLD,req[0]);
MPI_Irecv(&buf, 1 , MPI_INT, buf + max_proc_num -1, 0, MPI_COMM_WORLD,req[1]);
MPI_Wait(&reqs[1], &status);
// my end
MPI_Finalize(); /*Finalizes MPI calls */
return buf;
}
函数式编程(MapReduce)
简介
Hadoop
- 使用hadoop进行wordcount(统计文档中词汇出现的个数):
- input
- split
- map
- shuffle
- reduce
- output
- 同样可以解决稀疏矩阵的乘法
- 使用hadoop进行wordcount(统计文档中词汇出现的个数):
Spark,比hadoop主要多了数据共享,不会发生数据的重复加载
- 迭代计算,交互式计算,流计算
- RDD组成DAG有向无环图,API比较顶层,方便使用
- RDD中间运算结果保存在内存中,延迟比较小
- task以线程方式维护,任务启动快
GPU环境下并行程序设计
CUDA简介
- 通用的并行计算模型
- 单指令、多数据执行模式(SIMD)
- 所有线程执行同一段代码
- 大量并行计算资源处理不同的数据
- 隐藏存储器延迟
- 提升计算/通信比例
- 合并相邻地址的内存访问
- 快速线程切换
- 单指令、多数据执行模式(SIMD)
- 计算线程数:dimGrid和dimBlock中的每个参数相乘得到总线程数。
- 计算线程索引:一维,二维,三维。
- 首先GPU上很多并行化的轻量级线程。kernel在device上执行时实际上是启动很多线程,一个kernel所启动的所有线程称为一个网格(grid),同一个网格上的线程共享相同的全局内存空间,grid是线程结构的第一层次,而网格又可以分为很多线程块(block),一个线程块里面包含很多线程,这是第二个层次。线程两层组织结构如下图所示,这是一个gird和block均为2-dim的线程组织。grid和block都是定义为dim3类型的变量,dim3可以看成是包含三个无符号整数(x,y,z)成员的结构体变量,在定义时,缺省值初始化为1。因此grid和block可以灵活地定义为1-dim,2-dim以及3-dim结构,对于图中结构(主要水平方向为x轴),定义的grid和block如下所示,kernel在调用时也必须通过执行配置<<<grid, block>>>来指定kernel所使用的线程数及结构。
- 通用的并行计算模型
GPU存储器组织
线程组织及计算模型
CUDA C/C++
CUDA程序结构及执行
示例
Thrust及实例