/* * Copyright (c) 2020 Huawei Technologies Co.,Ltd. * * openGauss is licensed under Mulan PSL v2. * You can use this software according to the terms and conditions of the Mulan PSL v2. * You may obtain a copy of Mulan PSL v2 at: * * http://license.coscl.org.cn/MulanPSL2 * * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PSL v2 for more details. * --------------------------------------------------------------------------------------- * * vectorbatch.h * Core data structure definition for vector engine. * * * IDENTIFICATION * src/include/vecexecutor/vectorbatch.h * * --------------------------------------------------------------------------------------- */ #ifndef VECTORBATCH_H_ #define VECTORBATCH_H_ #include "postgres.h" #include "access/tupdesc.h" #include "lib/stringinfo.h" #include "catalog/pg_type.h" // Scalar data type // typedef uintptr_t ScalarValue; #define V_NULL_MASK 0b00000001 #define V_NOTNULL_MASK 0b00000000 // steal bit to identify variable value #define MASK_VAR 0xC000000000000000ULL #define MASK_VAR_POINTER 0x0000000000000000ULL #define MASK_VAR_STORAGE 0x8000000000000000ULL #define MASK_POINTER 0x0FFFFFFFFFFFFFFFULL #define VAR_POINTER(val) ((val & MASK_VAR) == MASK_VAR_POINTER) #define VAR_STORAGE(val) ((val & MASK_VAR) == MASK_VAR_STORAGE) typedef enum BatchCompressType { BCT_NOCOMP, BCT_LZ4 } BatchCompressType; inline bool COL_IS_ENCODE(int typeId) { // we search the sys table to find [0,8] attlen data type // select typname,typlen from pg_type where typlen >= 0 and typlen <=8; switch (typeId) { case CHAROID: case BOOLOID: case INT2OID: case INT8OID: case INT1OID: case INT4OID: case FLOAT4OID: case FLOAT8OID: case CASHOID: case DATEOID: case TIMEOID: case TIMESTAMPOID: case TIMESTAMPTZOID: case SMALLDATETIMEOID: case OIDOID: case TIDOID: case CIDOID: case ABSTIMEOID: case RELTIMEOID: case ANYOID: case VOIDOID: case TRIGGEROID: case INTERNALOID: case OPAQUEOID: case ANYELEMENTOID: case ANYNONARRAYOID: case LANGUAGE_HANDLEROID: case REGPROCOID: case XIDOID: case REGPROCEDUREOID: case REGOPEROID: case REGOPERATOROID: case REGCLASSOID: case REGTYPEOID: case REGCONFIGOID: case REGDICTIONARYOID: case ANYENUMOID: case FDW_HANDLEROID: case HLL_HASHVAL_OID: case SMGROID: return false; default: return true; } } #define BOTH_NOT_NULL(flag1, flag2) (likely(NOT_NULL((flag1) | (flag2)))) #define IS_NULL(flag) (unlikely(((flag)&V_NULL_MASK) == V_NULL_MASK)) #define NOT_NULL(flag) ((((unsigned int)flag) & V_NULL_MASK) == V_NOTNULL_MASK) #define SET_NULL(flag) ((flag) = (flag) | V_NULL_MASK) #define BOTH_NULL(flag1, flag2) (IS_NULL((flag1) & (flag2))) #define SET_NOTNULL(flag) ((flag) = (flag) & (~V_NULL_MASK)) #define BatchIsNull(pBatch) ((pBatch) == NULL || (pBatch)->m_rows == 0) #define VAR_BUF_SIZE 16384 // Retrieve selection vector guarded by selection usage flag // #define SelectionVector(pBatch) ((pBatch)->m_checkSel ? (pBatch)->m_sel : NULL) #define ShallowCopyVector(targetVector, sourceVector) \ ((targetVector).m_rows = (sourceVector).m_rows, \ (targetVector).m_vals = (sourceVector).m_vals, \ (targetVector).m_flag = (sourceVector).m_flag, \ (targetVector).m_buf = (sourceVector).m_buf) struct ScalarDesc : public BaseObject { // Scalar Value type Oid. Oid typeId; // atttypmod records type-specific data supplied at table creation time // The value will generally be -1 for types that do not need typmod. int4 typeMod; // this value means that the value in the scalarvector may be encoded, on storage/pointer to some // where else, may be inlined(in which case it is not encode) // it can be deduct from typeId, bool encoded : 1; ScalarDesc() { typeId = InvalidOid; typeMod = -1; encoded = false; }; }; struct varBuf : public BaseObject { char* buf; int len; int size; varBuf* next; }; class VarBuf : public BaseObject { public: // constructor .deconstructor VarBuf(MemoryContext context); ~VarBuf(); // init void Init(); void Init(int bufLen); void DeInit(bool needFree = true); // reset the buf. void Reset(); // append a binary object char* Append(const char* data, int datalen); // allocate a space. char* Allocate(int datalen); // add a var FORCE_INLINE ScalarValue AddVar(ScalarValue value) { return PointerGetDatum(Append(DatumGetPointer(value), VARSIZE_ANY(value))); } private: // create a buffer; varBuf* CreateBuf(int datalen); varBuf* m_head; varBuf* m_current; MemoryContext m_context; int m_bufNum; int m_bufInitLen; }; // the core data structure for a column class ScalarVector : public BaseObject { friend class VectorBatch; public: // number of values. int m_rows; // type desciption information for this scalar value. ScalarDesc m_desc; // this value means that the value in the scalarvector is always the same bool m_const; // flags in the scalar value array. uint8* m_flag; // a company buffer for store the data if the data type is not plain. VarBuf* m_buf; // the value array. ScalarValue* m_vals; public: // decode a variable length data. // null value judgement should be outside of this function. FORCE_INLINE static Datum Decode(ScalarValue val) { return val; } // convert a datum to scalar value static ScalarValue DatumToScalar(Datum datumVal, Oid datumType, bool isNull); template static ScalarValue DatumToScalarT(Datum datumVal, bool isNull); public: // constructor/deconstructor. ScalarVector(); ~ScalarVector(); // init the ScalarVector. // void init(MemoryContext cxt, ScalarDesc desc); // used in tsdb. init with another ScalarVector object. // void init(MemoryContext cxt, ScalarVector* vec, const int batchSize); // serialize the Scalar vector // void Serialize(StringInfo buf); // serialize the Scalar vector of the particular index // void Serialize(StringInfo buf, int idx); // Deserialize the vector // char* Deserialize(char* msg, size_t len); // Add a variable length data // this var may be from // cstring, fixed length(> 8) data type, or pg traditional header-contain variable length Datum AddVar(Datum data, int index); // Add a header-contain variable Datum AddVarWithHeader(Datum data); // Add a variable without header on a special position. The original variable will be // transfered in together with the length of the content. And inside the funtion, the header // of the ScalarValue will be added before the actual content according to the data type. Datum AddBPCharWithoutHeader(const char* data, int maxLen, int len, int aindex); Datum AddVarCharWithoutHeader(const char* data, int len, int aindex); // Add a short decimal without header on a special position. The value of decimal // will be transfered in by int64 format together with the scale of it. And inside the function, // the header will be added and the value will be converted into PG format. Here we only support // short decimal which can be stored using int64. Datum AddShortNumericWithoutHeader(int64 value, uint8 scale, int aindex); Datum AddBigNumericWithoutHeader(int128 value, uint8 scale, int aindex); char* AddVars(const char* src, int length); // add a normal header-contain val Datum AddHeaderVar(Datum data, int index); // add a cstring type val Datum AddCStringVar(Datum data, int index); // add a fixed length val template Datum AddFixLenVar(Datum data, int index); // copy a vector void copy(ScalarVector* vector, int start_idx, int endIdx); void copy(ScalarVector* vector); void copyDeep(ScalarVector* vector, int start_idx, int endIdx); void copyFlag(ScalarVector* vector, int start_idx, int end_idx); void copyNth(ScalarVector* vector, int Nth); void copy(ScalarVector* vector, const bool* pSel); // convert a cstring to Scalar value. static Datum DatumCstringToScalar(Datum data, Size len); // convert a fixed len datatype to Scalar Value static Datum DatumFixLenToScalar(Datum data, Size len); FORCE_INLINE bool IsNull(int i) { Assert(i >= 0 && i < m_rows); return ((m_flag[i] & V_NULL_MASK) == V_NULL_MASK); } FORCE_INLINE void SetNull(int i) { Assert(i >= 0 && i < BatchMaxSize); m_flag[i] |= V_NULL_MASK; } FORCE_INLINE void SetAllNull() { for (int i = 0; i < m_rows; i++) { SetNull(i); } } private: // init some function pointer. void BindingFp(); Datum (ScalarVector::*m_addVar)(Datum data, int index); }; struct SysColContainer : public BaseObject { int sysColumns; ScalarVector* m_ppColumns; uint8 sysColumpMap[9]; }; #define SelectionVector(pBatch) ((pBatch)->m_checkSel ? (pBatch)->m_sel : NULL) // A batch of vectorize rows // class VectorBatch : public BaseObject { public: // number of rows in the batch. // int m_rows; // number of columns in the batch. // int m_cols; // Shall we check the selection vector. // bool m_checkSel; // Selection vector; // bool* m_sel; // ScalarVector // ScalarVector* m_arr; // SysColumns // SysColContainer* m_sysColumns; // Compress buffer // StringInfo m_pCompressBuf; public: // Many Constructors // VectorBatch(MemoryContext cxt, TupleDesc desc); VectorBatch(MemoryContext cxt, VectorBatch* batch); VectorBatch(MemoryContext cxt, ScalarDesc* desc, int ncols); // Deconstructor. // ~VectorBatch(); // Serialize the particular data index of the batch into the buffer. // void Serialize(StringInfo buf, int idx); // Deserialze the per-row msg into the batch // void Deserialize(char* msg); // Serialize the batch into the buffer without compress. // void SerializeWithoutCompress(StringInfo buf); // Deserialze the msg into the batch without compress. // void DeserializeWithoutDecompress(char* msg, size_t msglen); // Serialize the batch into the buffer with lz4 compress. // void SerializeWithLZ4Compress(StringInfo buf); // Deserialze the compressed msg into the batch with lz4 compress. // void DeserializeWithLZ4Decompress(char* msg, size_t msglen); // Reset // void Reset(bool resetflag = false); void ResetSelection(bool value); // Test the batch is valid or not // bool IsValid(); void FixRowCount(); void FixRowCount(int rows); // Pack the batch // void Pack(const bool *sel); /* Optimzed Pack function */ void OptimizePack(const bool* sel, List* CopyVars); /* Optimzed Pack function for later read. later read cols and ctid col*/ void OptimizePackForLateRead(const bool* sel, List* lateVars, int ctidColIdx); // SysColumns // void CreateSysColContainer(MemoryContext cxt, List* sysVarList); ScalarVector* GetSysVector(int sysColIdx); int GetSysColumnNum(); template void Copy(VectorBatch* batch, int start_idx = 0, int endIdx = -1); void CopyNth(VectorBatch* batchSrc, int Nth); public: /* Pack template function. */ template void PackT(const bool* sel); /* Optimize template function. */ template void OptimizePackT(_in_ const bool* sel, _in_ List* CopyVars); /* Optimize template function for later read. */ template void OptimizePackTForLateRead(_in_ const bool* sel, _in_ List* lateVars, int ctidColIdx); private: // init the vectorbatch. void init(MemoryContext cxt, TupleDesc desc); void init(MemoryContext cxt, VectorBatch* batch); void init(MemoryContext cxt, ScalarDesc* desc, int ncols); }; /* * @Description: copy batch with specific rows * @in batch - current batch to be copyed. * @in startIdx - start index at current batch * @in endIdx - end index at current batch * @template deep - weather a deep copy * @template add - add rows or not */ template void VectorBatch::Copy(VectorBatch* batch, int start_idx, int endIdx) { int copy_end_idx; copy_end_idx = (endIdx == -1) ? batch->m_rows : endIdx; for (int i = 0; i < m_cols; i++) { if (false == add) m_arr[i].m_rows = 0; if (deep) { m_arr[i].copyDeep(&batch->m_arr[i], start_idx, copy_end_idx); } else { m_arr[i].copy(&batch->m_arr[i], start_idx, copy_end_idx); } } if (false == add) m_rows = 0; m_rows += copy_end_idx - start_idx; } template inline ScalarValue ScalarVector::DatumToScalarT(Datum datumVal, bool isNull) { ScalarValue val = 0; Size datumLen; /* length of the datum */ DBG_ASSERT(datumType != InvalidOid); if (!isNull) { if (COL_IS_ENCODE(datumType)) { switch (datumType) { case MACADDROID: val = DatumFixLenToScalar(datumVal, 6); break; case TIMETZOID: case TINTERVALOID: val = DatumFixLenToScalar(datumVal, 12); break; case INTERVALOID: case UUIDOID: val = DatumFixLenToScalar(datumVal, 16); break; case NAMEOID: val = DatumFixLenToScalar(datumVal, 64); break; case UNKNOWNOID: case CSTRINGOID: datumLen = strlen((char*)datumVal); val = DatumCstringToScalar(datumVal, datumLen); break; default: val = datumVal; break; } } else val = datumVal; } return val; } extern Datum ExtractAddrType(Datum* val); extern Datum ExtractFixedType(Datum* val); extern Datum ExtractVarType(Datum* val); extern Datum ExtractCstringType(Datum* val); /* * Convert the scalar value of vector batch to the datum of row tuple. * @_in_param val: The scalar value to be converted. * @return the converted datum to be returned. */ typedef Datum (*ScalarToDatum)(ScalarValue); template Datum convertScalarToDatumT(ScalarValue val) { Datum datum = 0; switch (typid) { case VARCHAROID: { datum = ScalarVector::Decode(val); break; } case TIMETZOID: { char* result = (char*)(ScalarVector::Decode(val)) + VARHDRSZ_SHORT; datum = PointerGetDatum(result); break; } case TIDOID: { datum = PointerGetDatum(val); break; } case UNKNOWNOID: { Datum tmp = ScalarVector::Decode(val); char* result = NULL; if (VARATT_IS_1B(tmp)) { result = (char*)tmp + VARHDRSZ_SHORT; } else { result = (char*)tmp + VARHDRSZ; } datum = PointerGetDatum(result); break; } default: { datum = (Datum)val; break; } } return datum; } #endif /* VECTORBATCH_H_ */