@@ -100,6 +100,7 @@ char *g_unpooled_user = "mls_admin";
100100
101101bool PoolConnectDebugPrint = false; /* Pooler connect debug print */
102102bool PoolerStuckExit = true; /* Pooler exit when stucked */
103+ bool PoolSubThreadLogPrint = true; /* Pooler sub thread log print */
103104
104105#define POOL_ASYN_WARM_PIPE_LEN 32 /* length of asyn warm pipe */
105106#define POOL_ASYN_WARN_NUM 1 /* how many connections to warm once maintaince per node pool */
@@ -397,6 +398,20 @@ typedef struct
397398 pg_time_t cmd_end_time ; /* command end time */
398399}PGXCPoolAsyncReq ;
399400
401+ static void pooler_subthread_write_log (int elevel , int lineno , const char * filename , const char * funcname , const char * fmt , ...)__attribute__((format (printf , 5 , 6 )));
402+
403+ /* Use this macro when a sub thread needs to print logs */
404+ #define pooler_thread_logger (elevel , ...) \
405+ do { \
406+ pooler_subthread_write_log(elevel, __LINE__, __FILE__, PG_FUNCNAME_MACRO, __VA_ARGS__); \
407+ } while(0)
408+
409+ #define FORMATTED_TS_LEN (128) /* format timestamp buf length */
410+ #define POOLER_WRITE_LOG_ONCE_LIMIT (5) /* number of logs written at a time */
411+ #define MAX_THREAD_LOG_PIPE_LEN (2 * 1024) /* length of thread log pipe */
412+ #define DEFAULT_LOG_BUF_LEN (1024) /* length of thread log length */
413+ PGPipe * g_ThreadLogQueue = NULL ;
414+
400415static inline void RebuildAgentIndex (void );
401416
402417static inline PGXCASyncTaskCtl * create_task_control (List * datanodelist , List * coordlist , int32 * fd_result , int32 * pid_result );
@@ -5134,6 +5149,170 @@ destroy_node_pool_free_slots(PGXCNodePool *node_pool)
51345149 }
51355150}
51365151
5152+ /*
5153+ * setup current log time
5154+ */
5155+ static void
5156+ setup_formatted_current_log_time (char * formatted_current_log_time )
5157+ {
5158+ pg_time_t stamp_time ;
5159+ char msbuf [13 ];
5160+ struct timeval timeval ;
5161+
5162+ gettimeofday (& timeval , NULL );
5163+ stamp_time = (pg_time_t ) timeval .tv_sec ;
5164+
5165+ /*
5166+ * Note: we expect that guc.c will ensure that log_timezone is set up (at
5167+ * least with a minimal GMT value) before Log_line_prefix can become
5168+ * nonempty or CSV mode can be selected.
5169+ */
5170+ pg_strftime (formatted_current_log_time , FORMATTED_TS_LEN ,
5171+ /* leave room for milliseconds... */
5172+ "%Y-%m-%d %H:%M:%S %Z" ,
5173+ pg_localtime (& stamp_time , log_timezone ));
5174+
5175+ /* 'paste' milliseconds into place... */
5176+ sprintf (msbuf , ".%03d" , (int ) (timeval .tv_usec / 1000 ));
5177+ memcpy (formatted_current_log_time + 19 , msbuf , 4 );
5178+ }
5179+
5180+ /*
5181+ * write pooler's subthread log into thread log queue
5182+ * only call by pooler's subthread in elog
5183+ */
5184+ static void
5185+ pooler_subthread_write_log (int elevel , int lineno , const char * filename , const char * funcname , const char * fmt , ...)
5186+ {
5187+ char * buf = NULL ;
5188+ int buf_len = 0 ;
5189+ int offset = 0 ;
5190+ char formatted_current_log_time [FORMATTED_TS_LEN ];
5191+
5192+ if (!PoolSubThreadLogPrint )
5193+ {
5194+ /* not enable sun thread log print, return */
5195+ return ;
5196+ }
5197+
5198+ if (PipeIsFull (g_ThreadLogQueue ))
5199+ {
5200+ return ;
5201+ }
5202+
5203+ /* use malloc in sub thread */
5204+ buf_len = strlen (filename ) + strlen (funcname ) + DEFAULT_LOG_BUF_LEN ;
5205+ buf = (char * )malloc (buf_len );
5206+ if (buf == NULL )
5207+ {
5208+ /* no log */
5209+ return ;
5210+ }
5211+
5212+ /* construction log, format: elevel | lineno | filename | funcname | log content */
5213+ * (int * )(buf + offset ) = elevel ;
5214+ offset += sizeof (elevel );
5215+ * (int * )(buf + offset ) = lineno ;
5216+ offset += sizeof (lineno );
5217+ memcpy (buf + offset , filename , strlen (filename ) + 1 );
5218+ offset += (strlen (filename ) + 1 );
5219+ memcpy (buf + offset , funcname , strlen (funcname ) + 1 );
5220+ offset += (strlen (funcname ) + 1 );
5221+
5222+ /*
5223+ * because the main thread writes the log of the sub thread asynchronously,
5224+ * record the actual log writing time here
5225+ */
5226+ setup_formatted_current_log_time (formatted_current_log_time );
5227+ memcpy (buf + offset , formatted_current_log_time , strlen (formatted_current_log_time ));
5228+ offset += strlen (formatted_current_log_time );
5229+ * (char * )(buf + offset ) = ' ' ;
5230+ offset += sizeof (char );
5231+
5232+ /* Generate actual output --- have to use appendStringInfoVA */
5233+ for (;;)
5234+ {
5235+ va_list args ;
5236+ int avail ;
5237+ int nprinted ;
5238+
5239+ avail = buf_len - offset - 1 ;
5240+ va_start (args , fmt );
5241+ nprinted = vsnprintf (buf + offset , avail , fmt , args );
5242+ va_end (args );
5243+ if (nprinted >= 0 && nprinted < avail - 1 )
5244+ {
5245+ offset += nprinted ;
5246+ * (char * )(buf + offset ) = '\0' ;
5247+ offset += sizeof (char );
5248+ break ;
5249+ }
5250+
5251+ buf_len = (buf_len * 2 > (int ) MaxAllocSize ) ? MaxAllocSize : buf_len * 2 ;
5252+ buf = (char * ) realloc (buf , buf_len );
5253+ if (buf == NULL )
5254+ {
5255+ /* no log */
5256+ return ;
5257+ }
5258+ }
5259+
5260+ /* put log into thread log queue, drop log if queue is full */
5261+ if (-1 == PipePut (g_ThreadLogQueue , buf ))
5262+ {
5263+ free (buf );
5264+ }
5265+ }
5266+
5267+ /*
5268+ * write subthread log in main thread
5269+ */
5270+ static void
5271+ pooler_handle_subthread_log (bool is_pooler_exit )
5272+ {
5273+ int write_log_cnt = 0 ;
5274+ int offset = 0 ;
5275+ int elevel = LOG ;
5276+ int lineno = 0 ;
5277+ char * log_buf = NULL ;
5278+ char * filename = NULL ;
5279+ char * funcname = NULL ;
5280+ char * log_content = NULL ;
5281+
5282+ while ((log_buf = (char * )PipeGet (g_ThreadLogQueue )) != NULL )
5283+ {
5284+ /* elevel | lineno | filename | funcname | log content */
5285+ elevel = * (int * )log_buf ;
5286+ offset = sizeof (elevel );
5287+ lineno = * (int * )(log_buf + offset );
5288+ offset += sizeof (lineno );
5289+ filename = log_buf + offset ;
5290+ offset += (strlen (filename ) + 1 );
5291+ funcname = log_buf + offset ;
5292+ offset += (strlen (funcname ) + 1 );
5293+ log_content = log_buf + offset ;
5294+
5295+ /* write log here */
5296+ elog_start (filename , lineno ,
5297+ #ifdef USE_MODULE_MSGIDS
5298+ PGXL_MSG_MODULE , PGXL_MSG_FILEID , __COUNTER__ ,
5299+ #endif
5300+ funcname );
5301+ elog_finish (elevel , "%s" , log_content );
5302+
5303+ free (log_buf );
5304+
5305+ /*
5306+ * if the number of logs written at one time exceeds POOLER_WRITE_LOG_ONCE_LIMIT,
5307+ * in order not to block the main thread, return here
5308+ */
5309+ if (write_log_cnt ++ >= POOLER_WRITE_LOG_ONCE_LIMIT && !is_pooler_exit )
5310+ {
5311+ return ;
5312+ }
5313+ }
5314+ }
5315+
51375316/*
51385317 * Main handling loop
51395318 */
@@ -5200,6 +5379,9 @@ PoolerLoop(void)
52005379 }
52015380#endif
52025381
5382+ /* create log queue */
5383+ g_ThreadLogQueue = CreatePipe (MAX_THREAD_LOG_PIPE_LEN );
5384+
52035385 /* create utility thread */
52045386 g_AsynUtilityPipeSender = CreatePipe (POOL_ASYN_WARM_PIPE_LEN );
52055387 ThreadSemaInit (& g_AsnyUtilitysem , 0 );
@@ -5282,6 +5464,7 @@ PoolerLoop(void)
52825464 */
52835465 if (!PostmasterIsAlive ())
52845466 {
5467+ pooler_handle_subthread_log (true);
52855468 exit (1 );
52865469 }
52875470
@@ -5309,6 +5492,7 @@ PoolerLoop(void)
53095492 * Just close the socket and exit. Linux will help to release the resouces.
53105493 */
53115494 close (server_fd );
5495+ pooler_handle_subthread_log (true);
53125496 exit (0 );
53135497 }
53145498
@@ -5420,6 +5604,9 @@ PoolerLoop(void)
54205604 check_duplicate_allocated_conn ();
54215605#endif
54225606 print_pooler_statistics ();
5607+
5608+ /* handle sub thread's log */
5609+ pooler_handle_subthread_log (false);
54235610 }
54245611}
54255612
0 commit comments