/* * Copyright (c) 2020 Huawei Technologies Co.,Ltd. * * openGauss is licensed under Mulan PSL v2. * You can use this software according to the terms and conditions of the Mulan PSL v2. * You may obtain a copy of Mulan PSL v2 at: * * http://license.coscl.org.cn/MulanPSL2 * * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PSL v2 for more details. * --------------------------------------------------------------------------------------- * * stmt_retry.h * support for statement retry * * * IDENTIFICATION * src/include/tcop/stmt_retry.h * * --------------------------------------------------------------------------------------- */ #ifndef STMT_RETRY_H #define STMT_RETRY_H #include "c.h" #include "lib/stringinfo.h" #include "utils/palloc.h" #include "utils/elog.h" #include "utils/hsearch.h" /* * test stub usage */ const uint16 RETRY_STUB_CASE_SIZE = 256; const uint16 RETRY_STUB_CASE_ECODE_SIZE = 21; typedef enum { STUB_DO_RETRY, STUB_PASS } RETRY_STUB_MARKER; /* * log usage */ const char* const STUB_PRINT_PREFIX = " [retry stub] "; const char* const STUB_PRINT_PREFIX_ALERT = " [retry stub alert] "; const char* const STUB_PRINT_PREFIX_TYPE_S = " ~special path~ "; const char* const STUB_PRINT_PREFIX_TYPE_R = " ~retry path~ "; const char* const PRINT_PREFIX_TYPE_ALERT = " alert : "; const char* const PRINT_PREFIX_TYPE_PATH = " retry path : "; /* * marco for statement retry */ #define STMT_RETRY_ENABLED \ (u_sess->attr.attr_common.max_query_retry_times > 0 && !u_sess->attr.attr_sql.enable_hadoop_env) /*message types which retry is supported*/ const char Q_MESSAGE = 'Q'; const char P_MESSAGE = 'P'; const char B_MESSAGE = 'B'; const char E_MESSAGE = 'E'; const char D_MESSAGE = 'D'; const char C_MESSAGE = 'C'; const char S_MESSAGE = 'S'; const char U_MESSAGE = 'U'; const char F_MESSAGE = 'F'; const char INVALID_MESSAGE = '0'; const char* const STMT_TOKEN = ";"; inline bool IsExtendQueryMessage(char msg_type) { return ((msg_type == P_MESSAGE) || (msg_type == B_MESSAGE) || (msg_type == E_MESSAGE) || (msg_type == D_MESSAGE) || (msg_type == C_MESSAGE) || (msg_type == S_MESSAGE) || (msg_type == U_MESSAGE) || (msg_type == F_MESSAGE)) ? true : false; } inline bool IsSimpleQueryMessage(char msg_type) { return (msg_type == Q_MESSAGE); } inline bool IsQueryMessage(char msg_type) { return (IsSimpleQueryMessage(msg_type) || IsExtendQueryMessage(msg_type)); } inline bool IsMessageTypeMatchWithProtocol(bool is_extend_query, char msg_type) { return is_extend_query ? IsExtendQueryMessage(msg_type) : IsSimpleQueryMessage(msg_type); } /* * QueryMessageContext is used to cache retry message context info for now * in the future. */ class QueryMessageContext : public BaseObject { public: QueryMessageContext(MemoryContext mem_context) : context_(mem_context) { MemoryContext old_context = MemoryContextSwitchTo(context_); initStringInfo(&cached_msg_); (void)MemoryContextSwitchTo(old_context); } ~QueryMessageContext(void) {} public: void CacheMessage(char msg_type, StringInfo msg_data); int MessageSize(void) const { return cached_msg_.len; } void DumpHistoryCommandsToPqBuffer(void); void Reset(void); char PeekFirstChar(void) const { return cached_msg_.data[0]; }; private: /* disallow copy*/ QueryMessageContext(const QueryMessageContext&); QueryMessageContext& operator=(const QueryMessageContext&); private: StringInfoData cached_msg_; /* If 'P' 'B' 'E''D', msg cached from input_message, If 'Q' cached from * debug_query string */ MemoryContext context_; }; typedef enum { STMT_RETRY_DEFAULT, /*default status of retry, meaning retry is not triggered*/ STMT_RETRY_SIMPLE_QUREY_RETRYING, /*indicate we are in simple query retry flow*/ STMT_RETRY_EXTEND_QUREY_RETRYING, /*indicate we are in pbe retry flow*/ } StatementRetryPhase; typedef StatementRetryPhase CombinedStatementRetryPhase; typedef struct MessageInfo { char type; int len; MessageInfo(char _type, int _len) : type(_type), len(_len) {} } MessageInfo; /* * since pbe messages in extend query executed in a continues way, for example messages flow can be * combination of "p-b-d-e-b-e-s", in extend query retry, we treat pbe message flow as a whole, * PBEFlowTracker track pbe flow and using tracked pbe flow info to check retrying pbe execution if * retry happens. */ class PBEFlowTracker : public BaseObject { public: PBEFlowTracker(MemoryContext mem_context) { MemoryContext old_context = MemoryContextSwitchTo(mem_context); initStringInfo(&pbe_flow_); initStringInfo(&history_pbe_flow_); (void)MemoryContextSwitchTo(old_context); } ~PBEFlowTracker(void) {} void Reset(void) { resetStringInfo(&pbe_flow_); resetStringInfo(&history_pbe_flow_); } public: void TrackPBE(char type) { appendStringInfoChar(&pbe_flow_, type); } void InitValidating(void) { if (pbe_flow_.len > 0) { copyStringInfo(&history_pbe_flow_, &pbe_flow_); resetStringInfo(&pbe_flow_); } } void Validate(char type); const char* PBEFlowStr(void) { return pbe_flow_.data; } private: StringInfoData pbe_flow_; /* recording executed pbe message flow */ StringInfoData history_pbe_flow_; /* recording executed pbe message flow before retrying */ }; /* * stub for statement retry, use it to Manually construct sql retry both in extended query and simple query */ class StatementRetryStub { public: StatementRetryStub(void) { InitECodeMarker(); Reset(); } ~StatementRetryStub(void) {} void Reset(void); public: void StartOneStubTest(char msg_type); bool OnStubTest(void) const { return on_stub_test; } void CloseOneStub(void); void FinishStubTest(const char* info); void InitECodeMarker(void); void ECodeStubTest(void); void ECodeValidate(void); private: /* disallow copy*/ StatementRetryStub(const StatementRetryStub&); StatementRetryStub& operator=(const StatementRetryStub&); private: uint8 stub_marker[RETRY_STUB_CASE_SIZE]; int ecode_marker[RETRY_STUB_CASE_ECODE_SIZE]; uint16 stub_pos; uint16 ecode_pos; bool on_stub_test; }; /* * this is the main class of statement retry control * 1.A StatementRetryController will track a simple query or a extend query(PBE flow) * during query execution, tracking behave like cache query string in simple query, * record p message pointer in extend query. * 2.once query retry is triggerd, StatementRetryController will control the retry routine, * such as how to settle the read/send buffer between cn and client, how to ReadCommand and so on */ class StatementRetryController : public BaseObject { public: StatementRetryController(MemoryContext mem_context) : retry_id(0), is_retry_enabled(false), cached_msg_context(mem_context), pbe_tracker(mem_context) { MemoryContext old_context = MemoryContextSwitchTo(mem_context); initStringInfo(&prepared_stmt_name); (void)MemoryContextSwitchTo(old_context); Reset(); } ~StatementRetryController(void) {} void Reset(void); public: /* common usage */ void EnableRetry(void); void DisableRetry(void); bool IsRetryEnabled(void) { return is_retry_enabled; } bool IsExtendQueryRetrying(void) const { return STMT_RETRY_EXTEND_QUREY_RETRYING == retry_phase; } bool IsSimpleQueryRetrying(void) const { return STMT_RETRY_SIMPLE_QUREY_RETRYING == retry_phase; } bool IsQueryRetrying(void) const { return (IsExtendQueryRetrying() || IsSimpleQueryRetrying()); } bool IsTrackedQueryMessageInfoValid(bool is_extend_query); void TriggerRetry(bool is_extend_query); void FinishRetry(void); void CleanPreparedStmt(void); void LogTraceInfo(MessageInfo info) { if (retry_phase > STMT_RETRY_DEFAULT) { ereport(DEBUG3, (errmodule(MOD_CN_RETRY), errmsg("retrying \"%c\" message, length \"%d\"", info.type, info.len))); } else { ereport(DEBUG3, (errmodule(MOD_CN_RETRY), errmsg("executing \"%c\" message, length \"%d\"", info.type, info.len))); } } /* message caching */ void CacheCommand(char type, StringInfo data) { cached_msg_context.CacheMessage(type, data); } void TrackMessageOnExecuting(char type) { #if USE_ASSERT_CHECKING if (IsExtendQueryMessage(type)) pbe_tracker.TrackPBE(type); #endif executing_msg_type = type; } void CacheStmtName(const char* stmt_name) { appendStringInfoString(&prepared_stmt_name, stmt_name); appendStringInfoString(&prepared_stmt_name, STMT_TOKEN); } char MessageOnExecuting(void) const { return executing_msg_type; } /* pbe flow tracking */ void ValidateExecuting(char type) { pbe_tracker.Validate(type); } const char* PBEFlowStr(void) { return pbe_tracker.PBEFlowStr(); } public: /*in order to compatible with legacy code, declaring all data member as public here */ uint64 retry_id; /* id for retrying query */ int retry_times; /* mark current retry times */ int error_code; /* recording error code that trigger long jump */ bool is_trans; /* if the query within either a transaction or a transaction block, * it is not allowed to retry */ bool is_tempfile_size_exceeded; /* if true, meaning tempfile for caching query result is exceeded, * so there is no way to make sure the query result can be exact, * in this situation retry can not be allowed */ bool is_unsupported_query_type; /* if true, meaning query type can't retry for now */ bool is_retry_enabled; /* if true, meaning retry feature is enabled, * now StatementRetryController will track query and trigger retry at long jump point */ bool is_transaction_committed; /* if true, meaning current transaction is committed, if error happens between transaction committed and finish, we disallow retry during this period */ CombinedStatementRetryPhase retry_phase; /* recording current retry phase */ QueryMessageContext cached_msg_context; /* cached statement context store statement info of a statement to * be retry subsequently */ #ifdef USE_RETRY_STUB StatementRetryStub stub_; /* test stub for auto trigger sql retry */ #endif private: char executing_msg_type; /* tracking msg on executing */ PBEFlowTracker pbe_tracker; /* tracking pbe flow execution */ StringInfoData prepared_stmt_name; /* caching prepared statement name */ }; /* *statement retry common usage */ extern StatementRetryController* StmtRetryInitController(void); extern void StmtRetryResetController(void); extern void StmtRetrySetFileExceededFlag(void); extern void StmtRetrySetQuerytUnsupportedFlag(void); extern void StmtRetrySetTransactionCommitFlag(bool flag); extern void StmtRetryValidate(StatementRetryController* controller); extern void StmtRetryDisable(void); extern bool IsStmtRetryEnabled(void); extern bool IsStmtRetryQueryRetrying(void); extern bool IsStmtRetryAvaliable(int elevel, int sqlErrCode); extern bool IsStmtRetryCapable(StatementRetryController* controller, bool is_extend_query); extern bool IsStmtNeedRetryByErrCode(const char* ecode_str, int errlevel); /* * lnline functions */ /* * @Description: check if the sql statement need to retry by error code. * @in ecode - error code type * @in elevel - elevel type * @return - true, statement retry is needed by error code. */ inline bool IsStmtNeedRetryByErrCode(int ecode, int errlevel) { const char* ecode_str = plpgsql_get_sqlstate(ecode); return IsStmtNeedRetryByErrCode(ecode_str, errlevel); } /* * @Description: filter statement retry by application name, filtering sql from application such as cm_agent and gs_clean, not to retry. * @in name - application name * @return */ inline bool IsStmtNeedRetryByApplicationName(const char* name) { return ((name != NULL) && (strcmp(name, "cm_agent") != 0) && (strcmp(name, "gs_clean") != 0) && (strcmp(name, "gc_fdw") != 0)); } #endif /*file end stmt_retry.h */