有几个参数控制并行执行的行为
set max_parallel_workers=64;
set max_parallel_maintenance_workers=64;
force_parallel_mode
parallel_leader_participation
pg支持自动计算并行执行,也支持优化器提示。
select /*+ parallel(t 4 hard)*/ count(1) from big_table t;
支持普通索引并行创建
drop index idx_file_name; CREATE INDEX idx_file_name ON big_search_doc_new_ic USING btree (filename);
GIN索引不支持并行执行,所以适合citus分布式架构做全文检索。
CREATE INDEX big_search_doc_new_ic_tsvector_content_idx ON big_search_doc_new_ic USING gin (tsvector_content);
并行执行不支持insert select,create table as select。其原因是可见性实现(也就是高并发下低成本的MVCC实现)还不太好:
- The combo CID mappings. This is needed to ensure consistent answers to
tuple visibility checks. The need to synchronize this data structure is
a major reason why we can't support writes in parallel mode: such writes
might create new combo CIDs, and we have no way to let other workers
(or the initiating backend) know about them.
除此之外,还包括函数、特性本身不支持,分为三种级别PROPARALLEL_UNSAFE, PROPARALLEL_RESTRICTED, PROPARALLEL_SAFE。
oracle并行执行
由于pg很大程度上利用linux pagecache,所以I/O这一块不是问题。
在SMP并行执行而言,有两种模式,在数据库中我们一般理解都是数据切片并行执行(intra-parallelism)。类似如下:
另外一种是操作间并行执行(也就是管道/ETL的模式,流式计算如spark、flink经常采用):
Postgresql内部并行执行的实现
进程之间通过信号进行通信,PROCSIG_PARALLEL_MESSAGE, /* message from cooperating parallel backend */
typedef struct ParallelExecutorInfo { PlanState *planstate; /* plan subtree we're running in parallel */ ParallelContext *pcxt; /* parallel context we're using */ BufferUsage *buffer_usage; /* points to bufusage area in DSM */ WalUsage *wal_usage; /* walusage area in DSM */ SharedExecutorInstrumentation *instrumentation; /* optional */ struct SharedJitInstrumentation *jit_instrumentation; /* optional */ dsa_area *area; /* points to DSA area in DSM */ dsa_pointer param_exec; /* serialized PARAM_EXEC parameters */ bool finished; /* set true by ExecParallelFinish */ /* These two arrays have pcxt->nworkers_launched entries: */ shm_mq_handle **tqueue; /* tuple queues for worker output */ struct TupleQueueReader **reader; /* tuple reader/writer support */ } ParallelExecutorInfo;
reader负责从worker产生结果存储的共享队列读取记录。
typedef struct ParallelWorkerInfo { BackgroundWorkerHandle *bgwhandle; shm_mq_handle *error_mqh; int32 pid; } ParallelWorkerInfo; typedef struct ParallelContext { dlist_node node; SubTransactionId subid; int nworkers; /* Maximum number of workers to launch */ int nworkers_to_launch; /* Actual number of workers to launch */ int nworkers_launched; char *library_name; char *function_name; ErrorContextCallback *error_context_stack; shm_toc_estimator estimator; dsm_segment *seg; void *private_memory; shm_toc *toc; ParallelWorkerInfo *worker; int nknown_attached_workers; bool *known_attached_workers; } ParallelContext; typedef struct ParallelWorkerContext { dsm_segment *seg; shm_toc *toc; } ParallelWorkerContext;