首頁 > 軟體

Postgresql原始碼分析returns setof函數oracle管道pipelined

2023-02-03 18:04:04

引言

【功能】

  • Oracle的return pipelined管道函數可以使一次返回的集合型別,變為 逐條返回pipe row(集合中的一條)給SQL層,大大減少記憶體的使用。
  • Postgresql的return setof函數並不能起到降低記憶體使用的效果,return next 單條資料只起到了快取的效果,並不會把資料逐條返回SQL層處理,沒有降低記憶體的效果。

【程式碼】

  • exec_stmt_return_next中的tupledesc從執行計劃node中取出,返回值需要滿足desc要求,快取值也會按該desc儲存。
  • return next對rec型別和row型別處理的區別
    • rec型別本質上就是tuple,資料和desc都以擴充套件形式存放在erh中。如果需要轉換為tuple,有幾個標準函數提供轉換功能,且支援型別轉換。【轉換後呼叫tuplestore的標準介面快取tuple】
    • row型別本質上是一個虛擬行(由一組datum位置組成),row->varnos[i]指向某一個datum,如果想把row轉換為tuple,需要用exec_eval_datum算出varnos指向的datum的值,然後組裝成values和nulls陣列,用heap_form_tuple構造。注意這種轉換過程不會有型別轉換,如果需要的desc和算出來的列型別對不上,返回空。成功【轉換後呼叫tuplestore的標準介面快取tuple】
  • return next對var型別的處理:var看做單列tuple,按執行計劃給的desc轉換型別後構造tuple。【轉換後呼叫tuplestore的標準介面快取tuple】

【實用函數】

  • 通用
    • 型別轉換:exec_cast_value(傳入的值不能是eoh真實的頭,使用前需要轉成eoh存的1be頭,1be指向真實頭)
    • 陣列拼接minimaltuple:heap_form_minimal_tuple
    • 有一個tuple和desc轉換為另一個desc的tuple:convert_tuples_by_position、execute_attr_map_tuple
  • tuplestore:
    • 用values陣列存tuple(用tuplestore_puttuple_common拼好後傳tuple):tuplestore_putvalues
    • 用HeapTuple存tuple(直接傳tuple):tuplestore_puttuple
  • 型別
    • 根據型別id和mod找出desc:lookup_rowtype_tupdesc
  • erh
    • 從erh擴充套件型別拿到緊湊tuple:expanded_record_get_tuple

1 :管道函數是什麼,應用於什麼場景

oracle支援pipelined函數,可以在函數定義時指定RETURN 集合型別 PIPELINED  來說明當前函數是管道函數。

管道函數最大的作用就是可以使一次返回的集合型別,變為 逐條返回,大大減少記憶體的使用。

例如:巢狀表型別outrecset是函數f_trans的返回值,普通函數只能組裝好巢狀表outrecset(全部快取在記憶體),一次性返回。如果巢狀表內容較多,可能會佔用較大的記憶體空間。

如果使用管道函數,可以通過pipe row(巢狀表中的一行)來代替return語句,函數把巢狀表逐行返回給上層處理,無需快取,降低記憶體使用。

ORACLE範例:

CREATE OR REPLACE PACKAGE refcur_pkg AUTHID DEFINER IS
  TYPE refcur_t IS REF CURSOR RETURN employees%ROWTYPE;
  TYPE outrec_typ IS RECORD (
    var_num    NUMBER(6),
    var_char1  VARCHAR2(30),
    var_char2  VARCHAR2(30)
  );
  TYPE outrecset IS TABLE OF outrec_typ;
  FUNCTION f_trans (p refcur_t) RETURN outrecset PIPELINED;
END refcur_pkg;
/
CREATE OR REPLACE PACKAGE BODY refcur_pkg IS
  FUNCTION f_trans (p refcur_t) RETURN outrecset PIPELINED IS
    out_rec outrec_typ;
    in_rec  p%ROWTYPE;
  BEGIN
    LOOP
      FETCH p INTO in_rec;  -- input row
      EXIT WHEN p%NOTFOUND;
      out_rec.var_num := in_rec.employee_id;
      out_rec.var_char1 := in_rec.first_name;
      out_rec.var_char2 := in_rec.last_name;
      PIPE ROW(out_rec);  -- first transformed output row
      out_rec.var_char1 := in_rec.email;
      out_rec.var_char2 := in_rec.phone_number;
      PIPE ROW(out_rec);  -- second transformed output row
    END LOOP;
    CLOSE p;
    RETURN;
  END f_trans;
END refcur_pkg;
/
SELECT * FROM TABLE (
  refcur_pkg.f_trans (
    CURSOR (SELECT * FROM employees WHERE department_id = 60)
  )
);

在PG中,普通的return語句也是需要一次性返回資料,但PG應該是參考ORACLE實現了return next的功能,也希望逐條返回資料(PG沒有集合型別,已普通型別為例):

drop function f1;
create or replace function f1(in i int, out j int) returns setof int as $$
begin
  j := i+1;
  return next;
  j := i+2;
  return next;
  return;
end$$ language plpgsql;
select * from f1(42);
 j  
----
 43
 44

但在核心實現中,並不是逐條返回的,return next其實只起到了快取資料的功能,總的資料集也是一次性返回SQL層的,和直接return沒有區別(只有語法上的區別)。

所以PG的return setof函數並不能起到降低記憶體使用的效果。下面來分析具體過程。

2 return next實現

return next目前支援三類資料的返回,var、rec、rows return next也可以不加引數,返回值按out參數列拼接

具體處理常式:exec_stmt_return_next

static int
exec_stmt_return_next(PLpgSQL_execstate *estate,
					  PLpgSQL_stmt_return_next *stmt)
{
	TupleDesc	tupdesc;
	int			natts;
	HeapTuple	tuple;
	MemoryContext oldcontext;

1 初始化tuple store

初始化總結:

1 初始化的過程就是在構造Tuplestorestate,主要動作:

  • 給Tuplestorestate新的記憶體上下文ExecutorState
  • 記錄不能隨機存取:eflags = EXEC_FLAG_REWIND
  • 記錄三個操作函數:copytup_heap、writetup_heap、readtup_heap

2 給estate->tuple_store_desc新增desc,desc來源:

  • 從執行計劃節點中node(Tuplestorestate)拿到後,傳入ExecMakeTableFunctionResult
  • ExecMakeTableFunctionResult組裝ReturnSetInfo掛到fcinfo->resultinfo上
  • plpgsql_exec_function時從fcinfo中拿出ReturnSetInfo取到desc
  • plpgsql_estate_setup將取到的desc存入estate->rsi = rsi
#0  plpgsql_estate_setup (estate=0x7ffd81e2f850, func=0x2419028, rsi=0x7ffd81e2fb20, simple_eval_estate=0x0, simple_eval_resowner=0x0) at pl_exec.c:3972
#1  0x00007fe0a3992064 in plpgsql_exec_function (func=0x2419028, fcinfo=0x24da5a8, simple_eval_estate=0x0, simple_eval_resowner=0x0, procedure_resowner=0x0, atomic=true) at pl_exec.c:485
#2  0x00007fe0a39ac8f9 in plpgsql_call_handler (fcinfo=0x24da5a8) at pl_handler.c:277
#3  0x0000000000738829 in ExecMakeTableFunctionResult (setexpr=0x24e0b40, econtext=0x24e0a10, argContext=0x24da490, expectedDesc=0x24e1110, randomAccess=false) at execSRF.c:235
#4  0x0000000000753eed in FunctionNext (node=0x24e0800) at nodeFunctionscan.c:95
#5  0x000000000073a081 in ExecScanFetch (node=0x24e0800, accessMtd=0x753e3b <FunctionNext>, recheckMtd=0x754242 <FunctionRecheck>) at execScan.c:133
#6  0x000000000073a0f6 in ExecScan (node=0x24e0800, accessMtd=0x753e3b <FunctionNext>, recheckMtd=0x754242 <FunctionRecheck>) at execScan.c:182
#7  0x000000000075428c in ExecFunctionScan (pstate=0x24e0800) at nodeFunctionscan.c:270
#8  0x000000000073614e in ExecProcNodeFirst (node=0x24e0800) at execProcnode.c:464
#9  0x000000000072a08a in ExecProcNode (node=0x24e0800) at ../../../src/include/executor/executor.h:262
#10 0x000000000072cb80 in ExecutePlan (estate=0x24e05d8, planstate=0x24e0800, use_parallel_mode=false, operation=CMD_SELECT, sendTuples=true, numberTuples=0, direction=ForwardScanDirection, dest=0x24d5910, execute_once=true) at execMain.c:1632
#11 0x000000000072a6d1 in standard_ExecutorRun (queryDesc=0x23f1248, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:364
#12 0x000000000072a50b in ExecutorRun (queryDesc=0x23f1248, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:308
#13 0x0000000000997ba9 in PortalRunSelect (portal=0x2474a28, forward=true, count=0, dest=0x24d5910) at pquery.c:924
#14 0x0000000000997867 in PortalRun (portal=0x2474a28, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0x24d5910, altdest=0x24d5910, qc=0x7ffd81e300b0) at pquery.c:768
#15 0x0000000000991408 in exec_simple_query (query_string=0x23c9518 "select * from f1(42);") at postgres.c:1238
#16 0x0000000000995a3e in PostgresMain (dbname=0x2400998 "postgres", username=0x23c5178 "mingjie") at postgres.c:4563
#17 0x00000000008d3cfe in BackendRun (port=0x23f7220) at postmaster.c:4396
#18 0x00000000008d3697 in BackendStartup (port=0x23f7220) at postmaster.c:4124
#19 0x00000000008d00b8 in ServerLoop () at postmaster.c:1791
#20 0x00000000008cf98a in PostmasterMain (argc=1, argv=0x23c3120) at postmaster.c:1463
#21 0x00000000007ada4b in main (argc=1, argv=0x23c3120) at main.c:200

分析:

	if (estate->tuple_store == NULL)
		exec_init_tuple_store(estate);
	tupdesc = estate->tuple_store_desc;
	natts = tupdesc->natts;
	if (stmt->retvarno >= 0)
	{
		PLpgSQL_datum *retvar = estate->datums[stmt->retvarno];
		switch (retvar->dtype)
		{

初始化函數exec_init_tuple_store

static void
exec_init_tuple_store(PLpgSQL_execstate *estate)
{
	ReturnSetInfo *rsi = estate->rsi;
	MemoryContext oldcxt;
	ResourceOwner oldowner;
// 從"SPI Proc"切換到"ExecutorState"
	oldcxt = MemoryContextSwitchTo(estate->tuple_store_cxt);
// 從「Portal」切換到"Portal"	
	oldowner = CurrentResourceOwner;
	CurrentResourceOwner = estate->tuple_store_owner;
// 進入tuplestore_begin_heap函數
	estate->tuple_store =
		tuplestore_begin_heap(rsi->allowedModes & SFRM_Materialize_Random, false, work_mem);
	CurrentResourceOwner = oldowner;
	MemoryContextSwitchTo(oldcxt);
// 給estate新增DESC,rsi->expectedDesc的來源?
	estate->tuple_store_desc = rsi->expectedDesc;
}

進入tuplestore_begin_heap

Tuplestorestate *
tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
{
// 輸入false不允許隨機存取、false、8192
	Tuplestorestate *state;
	int			eflags;
// eflags = EXEC_FLAG_REWIND
	eflags = randomAccess ?
		(EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND) :
		(EXEC_FLAG_REWIND);
// 進入tuple store模組開始初始化返回Tuplestorestate,注意他會直接拿當前的memcontext
	state = tuplestore_begin_common(eflags, interXact, maxKBytes);
// 返回的Tuplestorestate狀態:
// state = {status = TSS_INMEM, eflags = 2, backward = false, interXact = false, 
//          truncated = false, availMem = 8372200, allowedMem = 8388608, tuples = 0,
//          myfile = 0x0, context = "ExecutorState", resowner = "Portal", copytup = 0x0, 
//          writetup = 0x0, readtup = 0x0, memtuples = 0x24f0d88, memtupdeleted = 0,
//          memtupcount = 0, memtupsize = 2048, growmemtuples = true, readptrs = 0x24e7a70, 
//          activeptr = 0, readptrcount = 1, readptrsize = 8, writepos_file = 0,writepos_offset = 0}
	state->copytup = copytup_heap;
	state->writetup = writetup_heap;
	state->readtup = readtup_heap;
	return state;
}

後面根據返回值的不同,進入幾個分支。

在進入前,desc已經獲取到了: tupdesc = estate->tuple_store_desc; natts = tupdesc->natts;

場景一:return next返回var型別

			case PLPGSQL_DTYPE_VAR:
				{
					PLpgSQL_var *var = (PLpgSQL_var *) retvar;
					Datum		retval = var->value;
					bool		isNull = var->isnull;
					Form_pg_attribute attr = TupleDescAttr(tupdesc, 0);
					if (natts != 1)
						ereport(ERROR,
								(errcode(ERRCODE_DATATYPE_MISMATCH),
								 errmsg("wrong result type supplied in RETURN NEXT")));
					// retval是一個eoh的頭,後續處理需要一個1be的頭(1be的data部分指向eoh)
					retval = MakeExpandedObjectReadOnly(retval, isNull, var->datatype->typlen);
					// 轉成需要的型別
					retval = exec_cast_value(estate,
											 retval,
											 &isNull,
											 var->datatype->typoid,
											 var->datatype->atttypmod,
											 attr->atttypid,
											 attr->atttypmod);
					tuplestore_putvalues(estate->tuple_store, tupdesc, &retval, &isNull);
				}
				break;

執行tuplestore_putvalues儲存元組

void
tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc,
					 Datum *values, bool *isnull)
{
	MinimalTuple tuple;
	MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
	tuple = heap_form_minimal_tuple(tdesc, values, isnull);
// 記錄使用了多少空間,修改state->availMem
	USEMEM(state, GetMemoryChunkSpace(tuple));
	tuplestore_puttuple_common(state, (void *) tuple);
	MemoryContextSwitchTo(oldcxt);
}
static void
tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
{
	TSReadPointer *readptr;
	int			i;
	ResourceOwner oldowner;
	state->tuples++;
	switch (state->status)
	{

記憶體態直接用陣列快取tuple,tuple使用的記憶體是在外層函數切換上下文申請的。

		case TSS_INMEM:
			readptr = state->readptrs;
			for (i = 0; i < state->readptrcount; readptr++, i++)
			{
				if (readptr->eof_reached && i != state->activeptr)
				{
					readptr->eof_reached = false;
					readptr->current = state->memtupcount;
				}
			}
			if (state->memtupcount >= state->memtupsize - 1)
			{
				(void) grow_memtuples(state);
			}
			state->memtuples[state->memtupcount++] = tuple;
			if (state->memtupcount < state->memtupsize && !LACKMEM(state))
				return;
			PrepareTempTablespaces();
			oldowner = CurrentResourceOwner;
			CurrentResourceOwner = state->resowner;
			state->myfile = BufFileCreateTemp(state->interXact);
			CurrentResourceOwner = oldowner;
			state->backward = (state->eflags & EXEC_FLAG_BACKWARD) != 0;
			state->status = TSS_WRITEFILE;
			dumptuples(state);
			break;
...

場景二:return next返回record型別

			case PLPGSQL_DTYPE_REC:
				{
					PLpgSQL_rec *rec = (PLpgSQL_rec *) retvar;
					TupleDesc	rec_tupdesc;
					TupleConversionMap *tupmap;

拿到record:

{dtype = PLPGSQL_DTYPE_REC, dno = 1, refname = 0x24db608 "r", lineno = 3, isconst = false, notnull = false, default_val = 0x0, datatype = {typname='foo'}, rectypeid = 17117, firstfield = -1, erh = 0x2509708}

  • 資料和desc都在erh中,列名在firstfield指向的位置。
  • 資料型別在datatype中:foo
  • 資料型別oid在rectypeid中:17117->foo
					if (rec->erh == NULL)
						instantiate_empty_record_variable(estate, rec);
					if (ExpandedRecordIsEmpty(rec->erh))
						deconstruct_expanded_record(rec->erh);
// "SPI Proc"切到"ExprContext"
					oldcontext = MemoryContextSwitchTo(get_eval_mcontext(estate));
// return erh->er_tupdesc;
					rec_tupdesc = expanded_record_get_tupdesc(rec->erh);
// 從儲存的desc:rec_tupdesc轉換到輸出的desc:tupdesc,第一步:生成轉換map
					tupmap = convert_tuples_by_position(rec_tupdesc,
														tupdesc,
														gettext_noop("wrong record type supplied in RETURN NEXT"));
					tuple = expanded_record_get_tuple(rec->erh);
					if (tupmap)
// 從儲存的desc:rec_tupdesc轉換到輸出的desc:tupdesc,第二步:用map生成轉換後的元組
						tuple = execute_attr_map_tuple(tuple, tupmap);
// 快取元組
					tuplestore_puttuple(estate->tuple_store, tuple);
					MemoryContextSwitchTo(oldcontext);
				}
				break;

場景三:return next返回row型別

必須是兩列以上的out引數,直接return next空,才會使用這段邏輯。

			case PLPGSQL_DTYPE_ROW:
				{
					PLpgSQL_row *row = (PLpgSQL_row *) retvar;
					oldcontext = MemoryContextSwitchTo(get_eval_mcontext(estate));
// 必須嚴格匹配tupdesc的型別,對不上則轉換失敗
					tuple = make_tuple_from_row(estate, row, tupdesc);
					if (tuple == NULL)
						ereport(ERROR,...)
					tuplestore_puttuple(estate->tuple_store, tuple);
					MemoryContextSwitchTo(oldcontext);
				}
				break;
			default:
				elog(ERROR, "unrecognized dtype: %d", retvar->dtype);
				break;
		}
	}

3 用例

drop function f1;
create or replace function f1(in i int, out j int) returns setof int as $$
begin
  j := i+1;
  return next;
  j := i+2;
  return next;
  return;
end$$ language plpgsql;
select * from f1(42);
----
CREATE TABLE foo (fooid INT, foosubid INT, fooname TEXT);
INSERT INTO foo VALUES (1, 2, 'three');
INSERT INTO foo VALUES (4, 5, 'six');
CREATE OR REPLACE FUNCTION get_all_foo() RETURNS SETOF foo AS
$BODY$
DECLARE
    r foo%rowtype;
BEGIN
    FOR r IN
        SELECT * FROM foo WHERE fooid > 0
    LOOP
        -- can do some processing here
        RETURN NEXT r; -- return current row of SELECT
    END LOOP;
    RETURN;
END;
$BODY$
LANGUAGE plpgsql;
SELECT * FROM get_all_foo();
--------
drop function f1(int);
create function f1(in i int, out j int, out k text) returns setof record as $$
begin
  j := i+1;
  k := 'foo';
  return next;
  j := j+1;
  k := 'foot';
  return next;
  return;
end$$ language plpgsql;
select * from f1(42);

以上就是Postgresql原始碼分析returns setof函數oracle管道pipelined的詳細內容,更多關於Postgresql returns setof函數的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com