/* * Copyright (c) 2020 Huawei Technologies Co.,Ltd. * * openGauss is licensed under Mulan PSL v2. * You can use this software according to the terms and conditions of the Mulan PSL v2. * You may obtain a copy of Mulan PSL v2 at: * * http://license.coscl.org.cn/MulanPSL2 * * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PSL v2 for more details. * --------------------------------------------------------------------------------------- * * threadpool_group.h * Thread pool group controls listener and worker threads. * * IDENTIFICATION * src/include/threadpool/threadpool_group.h * * --------------------------------------------------------------------------------------- */ #ifndef THREAD_POOL_GROUP_H #define THREAD_POOL_GROUP_H #include "c.h" #include "utils/memutils.h" #include "knl/knl_variable.h" #define NUM_THREADPOOL_STATUS_ELEM 8 #define STATUS_INFO_SIZE 256 typedef enum { THREAD_SLOT_UNUSE = 0, THREAD_SLOT_INUSE } ThreadSlotStatus; struct ThreadSentryStatus { int spawntick; TimestampTz lastSpawnTime; ThreadSlotStatus slotStatus; }; struct ThreadWorkerSentry { ThreadSentryStatus stat; ThreadPoolWorker* worker; pthread_mutex_t mutex; pthread_cond_t cond; }; struct ThreadStreamSentry { ThreadSentryStatus stat; ThreadPoolStream* stream; pthread_mutex_t mutex; pthread_cond_t cond; }; typedef struct ThreadPoolStat { int groupId; int numaId; int bindCpuNum; int listenerNum; char workerInfo[STATUS_INFO_SIZE]; char sessionInfo[STATUS_INFO_SIZE]; char streamInfo[STATUS_INFO_SIZE]; } ThreadPoolStat; class ThreadPoolGroup : public BaseObject { public: ThreadPoolListener* m_listener; ThreadPoolGroup(int maxWorkerNum, int expectWorkerNum, int maxStreamNum, int groupId, int numaId, int cpuNum, int* cpuArr, bool enableBindCpuNuma); ~ThreadPoolGroup(); void Init(bool enableNumaDistribute); void InitWorkerSentry(); void ReleaseWorkerSlot(int i); void AddWorker(int i); ThreadId AddStream(StreamProducer* producer); void ShutDownThreads(); void AddWorkerIfNecessary(); bool EnlargeWorkers(int enlargeNum); void ReduceWorkers(int reduceNum); void ShutDownPendingWorkers(); void ReduceStreams(); void WaitReady(); float4 GetSessionPerThread(); void GetThreadPoolGroupStat(ThreadPoolStat* stat); /* get ready session list check for hang */ bool IsGroupTooBusy(); bool CheckGroupHang(); /* check for too busy flag */ void SetGroupTooBusy(bool isTooBusy); bool isGroupAlreadyTooBusy(); inline ThreadPoolListener* GetListener() { return m_listener; } inline int GetGroupId() { return m_groupId; } inline int GetNumaId() { return m_numaId; } inline bool AllSessionClosed() { return (m_sessionCount <= 0); } inline bool AllThreadShutDown() { return (m_workerNum <= 0); } ThreadId GetStreamFromPool(StreamProducer* producer); void ReturnStreamToPool(Dlelem* elem); void RemoveStreamFromPool(Dlelem* elem, int idx); void InitStreamSentry(); inline bool HasFreeStream() { return (m_idleStreamNum > 0); } friend class ThreadPoolWorker; friend class ThreadPoolListener; friend class ThreadPoolScheduler; private: void AttachThreadToCPU(ThreadId thread, int cpu); void AttachThreadToNodeLevel(ThreadId thread) const; void AttachThreadToCpuNuma(ThreadId thread); private: int m_maxWorkerNum; int m_maxStreamNum; int m_defaultWorkerNum; volatile int m_workerNum; volatile int m_listenerNum; volatile int m_expectWorkerNum; volatile int m_idleWorkerNum; volatile int m_pendingWorkerNum; volatile int m_streamNum; volatile int m_idleStreamNum; volatile int m_sessionCount; // all session count; volatile int m_waitServeSessionCount; // wait for worker to server volatile int m_processTaskCount; volatile int m_isTooBusy; int m_groupId; int m_numaId; int m_groupCpuNum; int* m_groupCpuArr; bool m_enableNumaDistribute; bool m_enableBindCpuNuma; cpu_set_t m_nodeCpuSet; /* for numa node distribution only */ cpu_set_t m_CpuNumaSet; /* for numa node distribution only */ ThreadWorkerSentry* m_workers; MemoryContext m_context; pthread_mutex_t m_mutex; ThreadStreamSentry* m_streams; DllistWithLock* m_freeStreamList; instr_time m_current_time; uint64 m_sessionId; }; #endif /* THREAD_POOL_GROUP_H */