diff --git a/bbinc/thdpool.h b/bbinc/thdpool.h index 17c9d8b048..664506bd2a 100644 --- a/bbinc/thdpool.h +++ b/bbinc/thdpool.h @@ -68,7 +68,8 @@ typedef void (*thdpool_thddque_fn)(struct thdpool *pool, struct workitem *item, typedef void (*thdpool_foreach_fn)(struct thdpool *pool, struct workitem *item, void *user); void thdpool_foreach(struct thdpool *pool, thdpool_foreach_fn, void *user); - +typedef void (*thdpool_for_each_thd_fn)(struct thdpool *pool, pthread_t tid, int idle, void *thddata, void *user); +void thdpool_for_each_thd(struct thdpool *pool, thdpool_for_each_thd_fn, void *user); struct thdpool *thdpool_create(const char *name, size_t per_thread_data_sz); int thdpool_destroy(struct thdpool **pool_p, int coopWaitUs); void thdpool_set_stack_size(struct thdpool *pool, size_t sz_bytes); diff --git a/bdb/bdb_thd_io.c b/bdb/bdb_thd_io.c index 6c9fbf7df5..fe2acd7c2e 100644 --- a/bdb/bdb_thd_io.c +++ b/bdb/bdb_thd_io.c @@ -156,29 +156,6 @@ static ssize_t bdb_write(int fd, const void *buf, size_t nbytes) return rc; } -void bdb_set_io_control(void (*start)(), void (*cmplt)()) -{ - logmsg(LOGMSG_DEBUG, "IO CONTROL disabled\n"); - if (1) - return; -#if 0 - io_start=start; - io_cmplt=cmplt; - - if - ( - db_env_set_func_read(bdb_read) || - db_env_set_func_fsync(bdb_fsync) || - db_env_set_func_write(bdb_write) - ) - { - printf("**FAILED db_env_set_func_pread RC\n"); - abort(); - } - printf("SET UP THD IO CONTROL\n"); -#endif -} - void bdb_get_iostats(int *n_reads, int *l_reads, int *n_writes, int *l_writes) { *n_reads = norm_reads; diff --git a/db/comdb2.h b/db/comdb2.h index abe5b1586d..020704b8af 100644 --- a/db/comdb2.h +++ b/db/comdb2.h @@ -1292,6 +1292,7 @@ struct osql_sess { pthread_mutex_t participant_lk; int is_done; int is_sanctioned; /* set by fdb from coordinator-master */ + int is_qconsume_only; }; typedef struct osql_sess osql_sess_t; @@ -1834,6 +1835,9 @@ extern int gbl_appsock_pooling; extern struct thdpool *gbl_appsock_thdpool; extern struct thdpool *gbl_osqlpfault_thdpool; extern struct thdpool *gbl_udppfault_thdpool; +extern struct thdpool *gbl_handle_buf_write_thdpool; +extern struct thdpool *gbl_handle_buf_read_thdpool; +extern struct thdpool *gbl_handle_buf_queue_thdpool; extern int gbl_consumer_rtcpu_check; extern int gbl_node1rtcpuable; diff --git a/db/db_tunables.c b/db/db_tunables.c index 2c6068c157..8994ee4fbb 100644 --- a/db/db_tunables.c +++ b/db/db_tunables.c @@ -566,6 +566,8 @@ extern int gbl_sc_history_max_rows; extern int gbl_sc_status_max_rows; extern int gbl_rep_process_pstack_time; extern int gbl_sql_recover_time; +extern int gbl_queue_use_dedicated_writers; +extern int gbl_queue_max_dedicated_writers; extern void set_snapshot_impl(snap_impl_enum impl); extern const char *snap_impl_str(snap_impl_enum impl); diff --git a/db/db_tunables.h b/db/db_tunables.h index e70bbcf118..907d4efd08 100644 --- a/db/db_tunables.h +++ b/db/db_tunables.h @@ -2499,4 +2499,6 @@ REGISTER_TUNABLE("sc_status_max_rows", "Max number of rows returned in comdb2_sc REGISTER_TUNABLE("rep_process_pstack_time", "pstack the server if rep_process runs longer than time specified in secs (Default: 30s)", TUNABLE_INTEGER, &gbl_rep_process_pstack_time, 0, NULL, NULL, NULL, NULL); REGISTER_TUNABLE("sql_recover_time", "Number of msec before checking if SQL has waiters. 0 will disable. (Default: 10ms)", TUNABLE_INTEGER, &gbl_sql_recover_time, 0, NULL, NULL, NULL, NULL); +REGISTER_TUNABLE("queue_use_dedicated_writers", "Whether queue-consumes are processed in dedicated writers. (Default: on)", TUNABLE_BOOLEAN, &gbl_queue_use_dedicated_writers, 0, NULL, NULL, NULL, NULL); +REGISTER_TUNABLE("queue_max_dedicated_writers", "Max number of dedicated queue-consume writers. (Default: on)", TUNABLE_INTEGER, &gbl_queue_max_dedicated_writers, 0, NULL, NULL, NULL, NULL); #endif /* _DB_TUNABLES_H */ diff --git a/db/handle_buf.c b/db/handle_buf.c index 2fafdee3fb..d32c93e8b5 100644 --- a/db/handle_buf.c +++ b/db/handle_buf.c @@ -63,131 +63,46 @@ enum THD_EV { THD_EV_END = 0, THD_EV_START = 1 }; /* request pool & queue */ -static pool_t *p_reqs; /* request pool */ - -struct dbq_entry_t { - LINKC_T(struct dbq_entry_t) qlnk; - LINKC_T(struct dbq_entry_t) rqlnk; - time_t queue_time_ms; - void *obj; -}; - -static pool_t *pq_reqs; /* queue entry pool */ - pool_t *p_bufs; /* buffer pool for socket requests */ pool_t *p_slocks; /* pool of socket locks*/ -LISTC_T(struct dbq_entry_t) q_reqs; /* all queued requests */ -static LISTC_T(struct dbq_entry_t) rq_reqs; /* queue of read requests */ - /* thread pool */ /* thread associated with this request */ struct thd { - pthread_t tid; - pthread_cond_t wakeup; struct ireq *iq; LINKC_T(struct thd) lnk; // extensions to allow calling thd_req inline int do_inline; - int inited; struct thr_handle *thr_self; }; -static pool_t *p_thds; -static LISTC_T(struct thd) idle; /*idle thread.*/ -static LISTC_T(struct thd) busy; /*busy thread.*/ - -static int write_thd_count = 0; - static int is_req_write(int opcode); -static pthread_mutex_t lock; pthread_mutex_t buf_lock = PTHREAD_MUTEX_INITIALIZER; -static pthread_attr_t attr; #ifdef MONITOR_STACK static size_t stack_sz; static comdb2ma stack_alloc; #endif -static pthread_cond_t coalesce_wakeup; -static unsigned coalesce_waiters = 0; -static unsigned coalesce_reqthd_waiters = 0; - /* stats */ - -enum { MAXSTAT = 200 }; - -static int nreqs; -static int nthdcreates; -static int nwaits; -static int nqfulls; static int nerrs; -static int nretire; -static int bkt_thd[MAXSTAT]; -static int bkt_queue[MAXSTAT]; - -static int iothreads = 0, waitthreads = 0; - -void test_the_lock(void) -{ - LOCK(&lock) {} - UNLOCK(&lock) -} -static void thd_io_start(void) -{ - LOCK(&lock) { iothreads++; } - UNLOCK(&lock); -} - -static void thd_io_complete(void) -{ - LOCK(&lock) { iothreads--; } - UNLOCK(&lock); -} +static void handle_buf_thd_start(struct thdpool *pool, void *thddata); +static void handle_buf_thd_stop(struct thdpool *pool, void *thddata); +static struct thdpool *create_handle_buf_thdpool(const char *name, int nthds); void thd_cleanup() { - int counter = 0; - while (nthdcreates > nretire && counter++ < gbl_thd_linger) - sleep(1); - - if (nthdcreates > nretire) - abort(); - - LISTC_CLEAN(&busy, lnk, 0, struct thd); - LISTC_CLEAN(&idle, lnk, 0, struct thd); - LISTC_CLEAN(&rq_reqs, rqlnk, 0, struct dbq_entry_t); - LISTC_CLEAN(&q_reqs, qlnk, 0, struct dbq_entry_t); - pool_clear(pq_reqs); - pool_clear(p_slocks); - pool_clear(p_bufs); - pool_clear(p_reqs); - pool_clear(p_thds); - Pthread_cond_destroy(&coalesce_wakeup); - Pthread_attr_destroy(&attr); - Pthread_mutex_destroy(&lock); + thdpool_destroy(&gbl_handle_buf_write_thdpool, 0); + thdpool_destroy(&gbl_handle_buf_read_thdpool, 0); + thdpool_destroy(&gbl_handle_buf_queue_thdpool, 0); } int thd_init(void) { - Pthread_mutex_init(&lock, 0); - Pthread_attr_init(&attr); - Pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - Pthread_cond_init(&coalesce_wakeup, NULL); - p_thds = pool_setalloc_init(sizeof(struct thd), 0, malloc, free); - if (p_thds == 0) { - logmsg(LOGMSG_ERROR, "thd_init:failed thd pool init"); - return -1; - } - p_reqs = pool_setalloc_init(sizeof(struct ireq), 0, malloc, free); - if (p_reqs == 0) { - logmsg(LOGMSG_ERROR, "thd_init:failed req pool init"); - return -1; - } p_bufs = pool_setalloc_init(MAX_BUFFER_SIZE, 64, malloc, free); if (p_bufs == 0) { logmsg(LOGMSG_ERROR, "thd_init:failed buf pool init"); @@ -198,44 +113,31 @@ int thd_init(void) logmsg(LOGMSG_ERROR, "thd_init:failed sock lock pool init"); return -1; } - pq_reqs = pool_setalloc_init(sizeof(struct dbq_entry_t), 64, malloc, free); - if (pq_reqs == 0) { - logmsg(LOGMSG_ERROR, "thd_init:failed queue req pool init"); - return -1; - } - - listc_init(&q_reqs, offsetof(struct dbq_entry_t, qlnk)); - listc_init(&rq_reqs, offsetof(struct dbq_entry_t, rqlnk)); - -#ifdef MONITOR_STACK - stack_sz = 4096 * 1024; - stack_alloc = comdb2ma_create_with_scope(0, 0, "stack", "tag", 1); - if (stack_alloc == NULL) { - logmsg(LOGMSG_ERROR, "thd_init: failed to initialize thread subsystem\n"); - return 1; - } -#endif - - listc_init(&idle, offsetof(struct thd, lnk)); - listc_init(&busy, offsetof(struct thd, lnk)); - bdb_set_io_control(thd_io_start, thd_io_complete); + gbl_handle_buf_write_thdpool = create_handle_buf_thdpool("handle_buf_write", gbl_maxwthreads); + gbl_handle_buf_read_thdpool = create_handle_buf_thdpool("handle_buf_read", gbl_maxthreads); + gbl_handle_buf_queue_thdpool = create_handle_buf_thdpool("handle_buf_queue", gbl_maxwthreads); Pthread_key_create(&thd_info_key, free); - Pthread_attr_setstacksize(&attr, 4096 * 1024); logmsg(LOGMSG_INFO, "thd_init: thread subsystem initialized\n"); return 0; } +#define THDPOOLS_COUNTER(c) \ + (thdpool_get_ ## c (gbl_handle_buf_read_thdpool) + \ + thdpool_get_ ## c (gbl_handle_buf_write_thdpool) + \ + thdpool_get_ ## c (gbl_handle_buf_queue_thdpool)) + +#define THDPOOLS_WRITER_COUNTER(c) \ + (thdpool_get_ ## c (gbl_handle_buf_write_thdpool) + \ + thdpool_get_ ## c (gbl_handle_buf_queue_thdpool)) + void thd_stats(void) { - int ii, jj; - logmsg(LOGMSG_USER, "num reqs %d\n", nreqs); - logmsg(LOGMSG_USER, "num waits %d\n", nwaits); - logmsg(LOGMSG_USER, "num items on queue %d\n", q_reqs.count); - logmsg(LOGMSG_USER, "num reads on queue %d\n", rq_reqs.count); - logmsg(LOGMSG_USER, "num threads wrt busy/busy/idle %d/%d/%d\n", write_thd_count, - busy.count, idle.count); - logmsg(LOGMSG_USER, "num threads in i/o %d\n", iothreads); - logmsg(LOGMSG_USER, "num threads waiting %d\n", waitthreads); + logmsg(LOGMSG_USER, "num reqs %d\n", THDPOOLS_COUNTER(passed) + THDPOOLS_COUNTER(enqueued)); + logmsg(LOGMSG_USER, "num waits %d\n", THDPOOLS_COUNTER(enqueued)); + logmsg(LOGMSG_USER, "num items on queue %d\n", THDPOOLS_COUNTER(queue_depth)); + logmsg(LOGMSG_USER, "num reads on queue %d\n", thdpool_get_queue_depth(gbl_handle_buf_read_thdpool)); + logmsg(LOGMSG_USER, "num threads wrt busy/busy/idle %d/%d/%d\n", THDPOOLS_WRITER_COUNTER(nbusythds), + thdpool_get_nbusythds(gbl_handle_buf_read_thdpool), THDPOOLS_COUNTER(nfreethds)); logmsg(LOGMSG_USER, "---\n"); logmsg(LOGMSG_USER, "config:MAXTHREADS %d\n", gbl_maxthreads); logmsg(LOGMSG_USER, "config:MAXWRTTHREADS %d\n", gbl_maxwthreads); @@ -243,197 +145,112 @@ void thd_stats(void) logmsg(LOGMSG_USER, "penaltyincpercent %d\n", gbl_penaltyincpercent); logmsg(LOGMSG_USER, "config:MAXQUEUE %d\n", gbl_maxqueue); logmsg(LOGMSG_USER, "---\n"); - logmsg(LOGMSG_USER, "num queue fulls %d\n", nqfulls); + logmsg(LOGMSG_USER, "num queue fulls %d\n", THDPOOLS_COUNTER(failed_dispatches)); logmsg(LOGMSG_USER, "num errors %d\n", nerrs); - logmsg(LOGMSG_USER, "num thread creates %d\n", nthdcreates); - logmsg(LOGMSG_USER, "num retires %d\n", nretire); + logmsg(LOGMSG_USER, "num thread creates %d\n", THDPOOLS_COUNTER(creates)); + logmsg(LOGMSG_USER, "num retires %d\n", THDPOOLS_COUNTER(exits)); logmsg(LOGMSG_USER, "---\n"); - logmsg(LOGMSG_USER, "#threads:count\n"); - for (jj = 0, ii = 0; ii < MAXSTAT; ii++) { - if (ii < 4 || bkt_thd[ii] > 0) { - logmsg(LOGMSG_USER, " %-3d:%-8d", ii + 1, bkt_thd[ii]); - jj++; - if ((jj % 4) == 0) - logmsg(LOGMSG_USER, "\n"); - } - } - if ((jj % 4) != 0) - logmsg(LOGMSG_USER, "\n"); - logmsg(LOGMSG_USER, "#queue:count\n"); - for (jj = 0, ii = 0; ii < MAXSTAT; ii++) { - if (ii < 4 || bkt_queue[ii] > 0) { - logmsg(LOGMSG_USER, " %-3d:%-8d", ii + 1, bkt_queue[ii]); - jj++; - if ((jj % 4) == 0) - logmsg(LOGMSG_USER, "\n"); - } - } - if ((jj % 4) != 0) - logmsg(LOGMSG_USER, "\n"); + logmsg(LOGMSG_USER, "Use to view busy thread info\n"); } void thd_dbinfo2_stats(struct db_info2_stats *stats) { - int ii; - uint32_t queue_sum = 0, queue_mean = 0; stats->thr_max = gbl_maxthreads; stats->thr_maxwr = gbl_maxwthreads; - stats->thr_cur = nthdcreates - nretire; + stats->thr_cur = THDPOOLS_COUNTER(nthds); stats->q_max_conf = gbl_maxqueue; - for (ii = 0; ii < MAXSTAT; ii++) { - if (bkt_queue[ii] > 0) { - stats->q_max_reached = ii + 1; - queue_mean += bkt_queue[ii] * stats->q_max_reached; - queue_sum += bkt_queue[ii]; - } - } - if (stats->q_max_reached > 0) { - double f = (double)queue_mean / (double)queue_sum; - f += 0.5; - stats->q_mean_reached = (uint32_t)f; - } else - stats->q_mean_reached = 0; + stats->q_max_reached = THDPOOLS_COUNTER(enqueued); + stats->q_mean_reached = 0; } -static void thd_coalesce_check_ll(void) +int thd_queue_depth(void) { - if (coalesce_waiters && busy.count <= coalesce_reqthd_waiters && - q_reqs.count == 0) { - Pthread_cond_broadcast(&coalesce_wakeup); - } + return THDPOOLS_COUNTER(queue_depth); } -static void thd_dump_nolock(void) +void thd_coalesce(struct dbenv *dbenv) { - struct thd *thd; - uint64_t nowus; - int opc, cnt = 0; - nowus = comdb2_time_epochus(); - - { - for (thd = busy.top; thd; thd = thd->lnk.next) { - cnt++; - opc = thd->iq->opcode; - logmsg(LOGMSG_USER, - "busy tid %p time %5d ms %-6s (%-3d) " - "%-20s where %s %s\n", - (void *)thd->tid, U2M(nowus - thd->iq->nowus), req2a(opc), opc, getorigin(thd->iq), thd->iq->where, - thd->iq->gluewhere); - } - - for (thd = idle.top; thd; thd = thd->lnk.next) { - cnt++; - logmsg(LOGMSG_USER, "idle tid %p \n", (void *)thd->tid); - } - } - - if (cnt == 0) - logmsg(LOGMSG_USER, "no active threads\n"); + thdpool_stop(gbl_handle_buf_read_thdpool); + thdpool_stop(gbl_handle_buf_write_thdpool); + thdpool_stop(gbl_handle_buf_queue_thdpool); } -int thd_queue_depth(void) +void dump_a_thd(struct thdpool *pool, pthread_t tid, int idle, void *thddata, void *user) { - return q_reqs.count; -} + int *pcnt = (int *)user; + struct thd *thd = thddata; + struct ireq *iq; + uint64_t nowus; -void thd_coalesce(struct dbenv *dbenv) -{ - LOCK(&lock) - { - struct thd *thd; - int am_req_thd = 0; - int num_wait = 0; - - /* fstsnd based fastinit can lead to us waiting for ourself.. check if - * this is one of the request threads and if so that's one less - * thread to wait for. */ - LISTC_FOR_EACH(&busy, thd, lnk) - { - if (thd->tid == pthread_self()) { - am_req_thd = 1; - break; - } - } - coalesce_waiters++; - coalesce_reqthd_waiters += am_req_thd; - while (busy.count > coalesce_reqthd_waiters || q_reqs.count > 0) { - int rc; - struct timespec ts; - - ++num_wait; - logmsg(LOGMSG_USER, "waiting for threads %d/%d/%d num queue %d\n", - write_thd_count, busy.count, idle.count, q_reqs.count); - if (num_wait > 5) - thd_dump_nolock(); - rc = clock_gettime(CLOCK_REALTIME, &ts); - ts.tv_sec += 1; - rc = pthread_cond_timedwait(&coalesce_wakeup, &lock, &ts); - if (rc != 0 && rc != ETIMEDOUT) - logmsg(LOGMSG_ERROR, "%s:pthread_cond_timedwait: %d %s\n", __func__, - rc, strerror(rc)); - } - coalesce_waiters--; - coalesce_reqthd_waiters -= am_req_thd; + ++(*pcnt); + nowus = comdb2_time_epochus(); + + if (idle) + logmsg(LOGMSG_USER, "idle tid %p \n", (void *)tid); + else { + iq = thd->iq; + logmsg(LOGMSG_USER, "busy tid %p time %5d ms %-6s (%-3d) %-20s where %s %s\n", + (void *)tid, U2M(nowus - iq->nowus), req2a(iq->opcode), iq->opcode, + getorigin(iq), iq->where, iq->gluewhere); } - UNLOCK(&lock); } void thd_dump(void) { - struct thd *thd; - uint64_t nowus; int cnt = 0; - nowus = comdb2_time_epochus(); - LOCK(&lock) - { - for (thd = busy.top; thd; thd = thd->lnk.next) { - cnt++; - logmsg(LOGMSG_USER, - "busy tid %p time %5d ms %-6s (%-3d) %-20s where %s " - "%s\n", - (void *)thd->tid, U2M(nowus - thd->iq->nowus), req2a(thd->iq->opcode), thd->iq->opcode, - getorigin(thd->iq), thd->iq->where, thd->iq->gluewhere); - } - for (thd = idle.top; thd; thd = thd->lnk.next) { - cnt++; - logmsg(LOGMSG_USER, "idle tid %p \n", (void *)thd->tid); - } - } - UNLOCK(&lock); + thdpool_for_each_thd(gbl_handle_buf_write_thdpool, dump_a_thd, &cnt); + thdpool_for_each_thd(gbl_handle_buf_queue_thdpool, dump_a_thd, &cnt); + thdpool_for_each_thd(gbl_handle_buf_read_thdpool, dump_a_thd, &cnt); + if (cnt == 0) logmsg(LOGMSG_USER, "no active threads\n"); } -int get_thd_info(thd_info **data, int *npoints) { +struct thds_info { struct thd_info *tinfo; - struct thd *thd; + int max; + int n; +}; + +void collect_a_thd(struct thdpool *pool, pthread_t tid, int idle, void *thddata, void *user) +{ + struct thd_info *tinfo; + struct thd *thd = (struct thd*)thddata; + struct ireq *iq = thd->iq; + struct thds_info *thds_info = (struct thds_info *)user; uint64_t nowus; nowus = comdb2_time_epochus(); - LOCK(&lock) - { - *npoints = busy.count + idle.count; - *data = tinfo = malloc((*npoints)*sizeof(thd_info)); - - LISTC_FOR_EACH(&busy, thd, lnk) - { - tinfo->state = strdup("busy"); - tinfo->time = U2M(nowus - thd->iq->nowus); - tinfo->machine = strdup(thd->iq->frommach); - tinfo->opcode = strdup(thd->iq->where); - tinfo->function = strdup(thd->iq->gluewhere); - tinfo->isIdle = 0; - ++tinfo; - } - LISTC_FOR_EACH(&idle, thd, lnk) - { - tinfo->state = strdup("idle"); - tinfo->isIdle = 1; - ++tinfo; - } + if (thds_info->n == thds_info->max) + return; + + tinfo = &thds_info->tinfo[thds_info->n]; + if (idle) { + tinfo->state = strdup("idle"); + tinfo->isIdle = 1; + } else { + tinfo->state = strdup("busy"); + tinfo->time = U2M(nowus - iq->nowus); + tinfo->machine = strdup(iq->frommach); + tinfo->opcode = strdup(iq->where); + tinfo->function = strdup(iq->gluewhere); + tinfo->isIdle = 0; } - UNLOCK(&lock); + ++thds_info->n; +} + +int get_thd_info(thd_info **data, int *npoints) +{ + struct thds_info thds_info = {0}; + thds_info.max = THDPOOLS_COUNTER(nthds); + thds_info.tinfo = malloc(thds_info.max * sizeof(thd_info)); + + thdpool_for_each_thd(gbl_handle_buf_write_thdpool, collect_a_thd, &thds_info); + + *npoints = thds_info.n; + *data = thds_info.tinfo; + return 0; } @@ -485,339 +302,122 @@ int signal_buflock(struct buf_lock_t *p_slock) } /* request handler */ -void *thd_req(void *vthd) +void thd_req(void *vthd) { comdb2_name_thread(__func__); struct thd *thd = (struct thd *)vthd; - struct dbenv *dbenv; - struct timespec ts; - int rc; - int iamwriter = 0; - struct thread_info *thdinfo = NULL; + struct thread_info *thdinfo = pthread_getspecific(thd_info_key); struct thr_handle *thr_self; struct reqlogger *logger; - int numwriterthreads; - - if (!thd->inited) { - if (thd->do_inline) { - thd->thr_self = thrman_self(); - } - else { - thd->thr_self = thrman_register(THRTYPE_REQ); - thread_started("request"); - } - - - ENABLE_PER_THREAD_MALLOC(__func__); - - dbenv = thd->iq->dbenv; - - // This was already called in the thread that's calling this code if we're called - // inline. If we're called as a start routine of a new thread, we need to call it - // ourselves. - if (!thd->do_inline) - backend_thread_event(dbenv, COMDB2_THR_EVENT_START_RDWR); - - /* thdinfo is assigned to thread specific variable thd_info_key which - * will automatically free it when the thread exits. */ - thdinfo = malloc(sizeof(struct thread_info)); - if (thdinfo == NULL) { - logmsg(LOGMSG_FATAL, "**aborting due malloc failure thd %p\n", (void *)pthread_self()); - abort(); - } - thdinfo->uniquetag = 0; - thdinfo->ct_id_key = 0LL; - - thdinfo->ct_add_table = create_constraint_table(); - if (thdinfo->ct_add_table == NULL) { - logmsg(LOGMSG_FATAL, - "**aborting: cannot allocate constraint add table thd " - "%p\n", - (void *)pthread_self()); - abort(); - } - thdinfo->ct_del_table = create_constraint_table(); - if (thdinfo->ct_del_table == NULL) { - logmsg(LOGMSG_FATAL, - "**aborting: cannot allocate constraint delete table " - "thd %p\n", - (void *)pthread_self()); - abort(); - } - thdinfo->ct_add_index = create_constraint_index_table(); - if (thdinfo->ct_add_index == NULL) { - logmsg(LOGMSG_FATAL, - "**aborting: cannot allocate constraint add index table " - "thd %p\n", - (void *)pthread_self()); - abort(); - } - thdinfo->ct_add_table_genid_hash = hash_init(sizeof(unsigned long long)); - thdinfo->ct_add_table_genid_pool = - pool_setalloc_init(sizeof(unsigned long long), 0, malloc, free); - - /* Initialize the sql statement cache */ - thdinfo->stmt_cache = stmt_cache_new(NULL); - if (thdinfo->stmt_cache == NULL) { - logmsg(LOGMSG_ERROR, "%s:%d failed to create sql statement cache\n", - __func__, __LINE__); - } + struct ireq *iq; - Pthread_setspecific(thd_info_key, thdinfo); - thd->inited = 1; - } thr_self = thd->thr_self; - logger = thrman_get_reqlogger(thr_self); - - /*printf("started handler %ld thd %p thd->id %p\n", pthread_self(), thd, - * thd->tid);*/ - do { - if (thd->tid != pthread_self()) /*sanity check*/ - { - logmsg(LOGMSG_FATAL, "**aborting due thd_req mismatch thd id %p (my thd %p)\n", (void *)thd->tid, - (void *)pthread_self()); - abort(); - } - thd->iq->startus = comdb2_time_epochus(); - thd->iq->where = "executing"; - /*PROCESS REQUEST*/ - thd->iq->reqlogger = logger; - iamwriter = is_req_write(thd->iq->opcode) ? 1 : 0; - dbenv = thd->iq->dbenv; - thrman_where(thr_self, req2a(thd->iq->opcode)); - thrman_origin(thr_self, getorigin(thd->iq)); - user_request_begin(REQUEST_TYPE_QTRAP, FLAG_REQUEST_TRACK_EVERYTHING); - handle_ireq(thd->iq); - if (debug_this_request(gbl_debug_until) || + iq = thd->iq; + + iq->startus = comdb2_time_epochus(); + iq->where = "executing"; + /*PROCESS REQUEST*/ + iq->reqlogger = logger; + thrman_where(thr_self, req2a(iq->opcode)); + thrman_origin(thr_self, getorigin(iq)); + user_request_begin(REQUEST_TYPE_QTRAP, FLAG_REQUEST_TRACK_EVERYTHING); + handle_ireq(iq); + if (debug_this_request(gbl_debug_until) || (gbl_who > 0 && !gbl_sdebug)) { - struct per_request_stats *st; - st = user_request_get_stats(); - if (st) - logmsg(LOGMSG_USER, "nreads %d (%lld bytes) nwrites %d (%lld bytes) nfsyncs " - "%d nmempgets %d\n", - st->nreads, st->readbytes, st->nwrites, st->writebytes, - st->nfsyncs, st->mempgets); - } - thread_util_donework(); - thrman_origin(thr_self, NULL); - thrman_where(thr_self, "idle"); - thd->iq->where = "done executing"; - - // before acquiring next request, yield - comdb2bma_yield_all(); - - if (thd->do_inline) - return NULL; + struct per_request_stats *st; + st = user_request_get_stats(); + if (st) + logmsg(LOGMSG_USER, "nreads %d (%lld bytes) nwrites %d (%lld bytes) nfsyncs " + "%d nmempgets %d\n", + st->nreads, st->readbytes, st->nwrites, st->writebytes, + st->nfsyncs, st->mempgets); + } + thread_util_donework(); + thrman_origin(thr_self, NULL); + thrman_where(thr_self, "idle"); + iq->where = "done executing"; - /*NEXT REQUEST*/ - LOCK(&lock) - { - struct dbq_entry_t *nxtrq = NULL; - int newrqwriter = 0; + if (thd->do_inline) { + free(iq); + return; + } - if (iamwriter) { - write_thd_count--; - } + /* before acquiring next request, yield */ + comdb2bma_yield_all(); - if (thd->iq->usedb && thd->iq->ixused >= 0 && - thd->iq->ixused < thd->iq->usedb->nix && - thd->iq->usedb->ixuse) { - thd->iq->usedb->ixuse[thd->iq->ixused] += thd->iq->ixstepcnt; - } - thd->iq->ixused = -1; - thd->iq->ixstepcnt = 0; + /* clean up and stats */ + if (iq->usedb && iq->ixused >= 0 && + iq->ixused < iq->usedb->nix && + iq->usedb->ixuse) { + iq->usedb->ixuse[iq->ixused] += iq->ixstepcnt; + } + iq->ixused = -1; + iq->ixstepcnt = 0; - if (thd->iq->dbglog_file) { - sbuf2close(thd->iq->dbglog_file); - thd->iq->dbglog_file = NULL; - } - if (thd->iq->nwrites) { - free(thd->iq->nwrites); - thd->iq->nwrites = NULL; - } - if (thd->iq->vfy_genid_hash) { - hash_free(thd->iq->vfy_genid_hash); - thd->iq->vfy_genid_hash = NULL; - } - if (thd->iq->vfy_genid_pool) { - pool_free(thd->iq->vfy_genid_pool); - thd->iq->vfy_genid_pool = NULL; - } - thd->iq->vfy_genid_track = 0; - if (thd->iq->vfy_idx_hash) { - destroy_hash(thd->iq->vfy_idx_hash, free_it); - thd->iq->vfy_idx_hash = NULL; - } - thd->iq->vfy_idx_track = 0; - thd->iq->dup_key_insert = 0; -#if 0 - fprintf(stderr, "%s:%d: THD=%p relablk iq=%p\n", __func__, __LINE__, pthread_self(), thd->iq); -#endif - pool_relablk(p_reqs, thd->iq); /* this request is done, so release - * resource. */ - /* get next item off hqueue */ - nxtrq = (struct dbq_entry_t *)listc_rtl(&q_reqs); - thd->iq = 0; - if (nxtrq != 0) { - thd->iq = nxtrq->obj; - newrqwriter = is_req_write(thd->iq->opcode) ? 1 : 0; - - numwriterthreads = gbl_maxwthreads - gbl_maxwthreadpenalty; - if (numwriterthreads < 1) - numwriterthreads = 1; - - if (newrqwriter && - (write_thd_count - iothreads) >= numwriterthreads) { - /* dont process next request as it goes over - the write limit..put it back on queue and grab - next read */ - listc_atl(&q_reqs, nxtrq); - nxtrq = (struct dbq_entry_t *)listc_rtl(&rq_reqs); - if (nxtrq != NULL) { - listc_rfl(&q_reqs, nxtrq); - /* release the memory block of the link */ - thd->iq = nxtrq->obj; - pool_relablk(pq_reqs, nxtrq); - newrqwriter = 0; - } else { - thd->iq = 0; - } - } else { - if (!newrqwriter) { - /*get rid of new request from read queue */ - listc_rfl(&rq_reqs, nxtrq); - } - /* release the memory block of the link */ - pool_relablk(pq_reqs, nxtrq); - } - if (newrqwriter && thd->iq != 0) { - write_thd_count++; - } - } - if (thd->iq == 0) { - /*wait for something to do, or go away after a while */ - listc_rfl(&busy, thd); - thd_coalesce_check_ll(); - - listc_atl(&idle, thd); - - rc = clock_gettime(CLOCK_REALTIME, &ts); - if (rc != 0) { - logmsg(LOGMSG_ERROR, "thd_req:clock_gettime bad rc %d:%s\n", rc, - strerror(errno)); - memset(&ts, 0, sizeof(ts)); /*force failure later*/ - } - - rc = 0; - int ii = 0; - do { /* wait gbl_thd_linger seconds via one second increments */ - ts.tv_sec += 1; - ii++; - /*waitft thread will deposit a request in thd->iq*/ - rc = pthread_cond_timedwait(&thd->wakeup, &lock, &ts); - } while ((thd->iq == 0 && rc == 0) || - (rc == ETIMEDOUT && ii < gbl_thd_linger && !db_is_exiting())); - - if (rc != 0 && rc != ETIMEDOUT) { - logmsg(LOGMSG_ERROR, "thd_req:pthread_cond_timedwait failed:%s\n", - strerror(rc)); - /* error'd out, so i still have lock: errLOCK(&lock);*/ - } - if (thd->iq == 0) /*nothing to do. this thread retires.*/ - { - nretire++; - listc_rfl(&idle, thd); - Pthread_cond_destroy(&thd->wakeup); - thd->tid = (pthread_t)-2; /*returned. this is just for info & debugging*/ - pool_relablk(p_thds, thd); /*release this struct*/ - /**/ - retUNLOCK(&lock); - /**/ - /*printf("ending handler %p\n", pthread_self());*/ - delete_constraint_table(thdinfo->ct_add_table); - delete_constraint_table(thdinfo->ct_del_table); - delete_constraint_table(thdinfo->ct_add_index); - hash_free(thdinfo->ct_add_table_genid_hash); - if (thdinfo->ct_add_table_genid_pool) { - pool_free(thdinfo->ct_add_table_genid_pool); - } - delete_defered_index_tbl(); - backend_thread_event(dbenv, COMDB2_THR_EVENT_DONE_RDWR); - return 0; - } - } - thd_coalesce_check_ll(); - } - UNLOCK(&lock); - - // TODO: reuse - /* Should not be done under lock - might be expensive */ - truncate_constraint_table(thdinfo->ct_add_table); - truncate_constraint_table(thdinfo->ct_del_table); - truncate_constraint_table(thdinfo->ct_add_index); - hash_clear(thdinfo->ct_add_table_genid_hash); - if (thdinfo->ct_add_table_genid_pool) { - pool_clear(thdinfo->ct_add_table_genid_pool); - } - truncate_defered_index_tbl(); - } while (1); + if (iq->dbglog_file) { + sbuf2close(iq->dbglog_file); + iq->dbglog_file = NULL; + } + if (iq->nwrites) { + free(iq->nwrites); + iq->nwrites = NULL; + } + if (iq->vfy_genid_hash) { + hash_free(iq->vfy_genid_hash); + iq->vfy_genid_hash = NULL; + } + if (iq->vfy_genid_pool) { + pool_free(iq->vfy_genid_pool); + iq->vfy_genid_pool = NULL; + } + iq->vfy_genid_track = 0; + if (iq->vfy_idx_hash) { + destroy_hash(iq->vfy_idx_hash, free_it); + iq->vfy_idx_hash = NULL; + } + iq->vfy_idx_track = 0; + iq->dup_key_insert = 0; + free(iq); + thd->iq = 0; + + // TODO: reuse + /* Should not be done under lock - might be expensive */ + truncate_constraint_table(thdinfo->ct_add_table); + truncate_constraint_table(thdinfo->ct_del_table); + truncate_constraint_table(thdinfo->ct_add_index); + hash_clear(thdinfo->ct_add_table_genid_hash); + if (thdinfo->ct_add_table_genid_pool) { + pool_clear(thdinfo->ct_add_table_genid_pool); + } + truncate_defered_index_tbl(); } void thd_req_inline(struct ireq *iq) { struct thd inlinerq = {0}; // TODO: reuse the constraint tables, etc inlinerq.do_inline = 1; - inlinerq.inited = 0; - inlinerq.tid = pthread_self(); inlinerq.iq = iq; - thd_req(&inlinerq); - struct thread_info *thdinfo = pthread_getspecific(thd_info_key); - delete_constraint_table(thdinfo->ct_add_table); - delete_constraint_table(thdinfo->ct_del_table); - delete_constraint_table(thdinfo->ct_add_index); - hash_free(thdinfo->ct_add_table_genid_hash); - if (thdinfo->ct_add_table_genid_pool) { - pool_free(thdinfo->ct_add_table_genid_pool); - } - delete_defered_index_tbl(); + handle_buf_thd_start(NULL, (void *)&inlinerq); + thd_req(&inlinerq); + handle_buf_thd_stop(NULL, (void *)&inlinerq); } /* sndbak error code & return resources.*/ static int reterr(intptr_t curswap, struct thd *thd, struct ireq *iq, int rc) /* 040307dh: 64bits */ { - if (thd || iq) { - LOCK(&lock) - { - if (thd) { - if (thd->iq) { - int iamwriter = 0; - iamwriter = is_req_write(thd->iq->opcode) ? 1 : 0; - listc_rfl(&busy, thd); /*this means busy*/ - thd_coalesce_check_ll(); - if (iamwriter) { - write_thd_count--; - } - } - thd->iq = 0; - thd->tid = (pthread_t)-1; - pool_relablk(p_thds, thd); - } - if (iq) { - if (iq->is_fromsocket) { - if (iq->is_socketrequest) { - sndbak_open_socket(iq->sb, NULL, 0, ERR_INTERNAL); - } else { - sndbak_socket(iq->sb, NULL, 0, ERR_INTERNAL); - iq->sb = NULL; - } - } - pool_relablk(p_reqs, iq); - } + if (thd) + thd->iq = 0; + if (iq && iq->is_fromsocket) { + if (iq->is_socketrequest) { + sndbak_open_socket(iq->sb, NULL, 0, ERR_INTERNAL); + } else { + sndbak_socket(iq->sb, NULL, 0, ERR_INTERNAL); + iq->sb = NULL; } - UNLOCK(&lock); + free(iq); } if (iq && iq->ipc_sndbak) { iq->ipc_sndbak(iq, rc, iq->p_buf_out_end - iq->p_buf_out_start); @@ -856,15 +456,7 @@ static int reterr_withfree(struct ireq *iq, int rc) iq->p_buf_out_end = iq->p_buf_out_start = iq->p_buf_out = NULL; iq->p_buf_in_end = iq->p_buf_in = NULL; - LOCK(&lock) - { -#if 0 - fprintf(stderr, "%s:%d: THD=%p relablk iq=%p\n", __func__, __LINE__, pthread_self(), iq); -#endif - pool_relablk(p_reqs, iq); - } - UNLOCK(&lock); - + free(iq); return 0; } else { return reterr(iq->curswap, NULL, iq, rc); @@ -921,9 +513,7 @@ int handle_buf(struct dbenv *dbenv, uint8_t *p_buf, const uint8_t *p_buf_end, NULL, NULL, REQ_WAITFT, NULL, 0, 0, NULL); } -int handled_queue; - -int q_reqs_len(void) { return q_reqs.count; } +int q_reqs_len(void) { return THDPOOLS_COUNTER(queue_depth); } static int init_ireq_legacy(struct dbenv *dbenv, struct ireq *iq, SBUF2 *sb, uint8_t *p_buf, const uint8_t *p_buf_end, int debug, @@ -1048,6 +638,137 @@ static int init_ireq_legacy(struct dbenv *dbenv, struct ireq *iq, SBUF2 *sb, int gbl_handle_buf_add_latency_ms = 0; +struct thdpool *gbl_handle_buf_write_thdpool; +struct thdpool *gbl_handle_buf_read_thdpool; +struct thdpool *gbl_handle_buf_queue_thdpool; + +static void handle_buf_work_pp(struct thdpool *pool, void *work, void *thddata, int op) +{ + struct ireq *iq = work; + struct thd *thd = thddata; + + switch (op) { + case THD_RUN: + thd->iq = iq; + thd_req(thd); + break; + default: + break; + } +} + +static void handle_buf_thd_start(struct thdpool *pool, void *thddata) +{ + struct thd *thd = thddata; + struct thread_info *thdinfo; + + if (pool == NULL) { + thd->thr_self = thrman_self(); + } else { + thd->thr_self = thrman_register(THRTYPE_REQ); + } + + // This was already called in the thread that's calling this code if we're called + // inline. If we're called as a start routine of a new thread, we need to call it + // ourselves. + if (pool != NULL) + backend_thread_event(thedb, COMDB2_THR_EVENT_START_RDWR); + + /* thdinfo is assigned to thread specific variable thd_info_key which + * will automatically free it when the thread exits. */ + thdinfo = malloc(sizeof(struct thread_info)); + if (thdinfo == NULL) { + logmsg(LOGMSG_FATAL, "**aborting due malloc failure thd %p\n", (void *)pthread_self()); + abort(); + } + thdinfo->uniquetag = 0; + thdinfo->ct_id_key = 0LL; + + thdinfo->ct_add_table = create_constraint_table(); + if (thdinfo->ct_add_table == NULL) { + logmsg(LOGMSG_FATAL, + "**aborting: cannot allocate constraint add table thd %p\n", + (void *)pthread_self()); + abort(); + } + thdinfo->ct_del_table = create_constraint_table(); + if (thdinfo->ct_del_table == NULL) { + logmsg(LOGMSG_FATAL, + "**aborting: cannot allocate constraint delete table thd %p\n", + (void *)pthread_self()); + abort(); + } + thdinfo->ct_add_index = create_constraint_index_table(); + if (thdinfo->ct_add_index == NULL) { + logmsg(LOGMSG_FATAL, + "**aborting: cannot allocate constraint add index table thd %p\n", + (void *)pthread_self()); + abort(); + } + thdinfo->ct_add_table_genid_hash = hash_init(sizeof(unsigned long long)); + thdinfo->ct_add_table_genid_pool = pool_setalloc_init(sizeof(unsigned long long), 0, malloc, free); + + /* Initialize the sql statement cache */ + thdinfo->stmt_cache = stmt_cache_new(NULL); + if (thdinfo->stmt_cache == NULL) { + logmsg(LOGMSG_ERROR, "%s:%d failed to create sql statement cache\n", + __func__, __LINE__); + } + + Pthread_setspecific(thd_info_key, thdinfo); +} + +static void handle_buf_thd_stop(struct thdpool *pool, void *thddata) +{ + struct thread_info *thdinfo = pthread_getspecific(thd_info_key); + + delete_constraint_table(thdinfo->ct_add_table); + delete_constraint_table(thdinfo->ct_del_table); + delete_constraint_table(thdinfo->ct_add_index); + hash_free(thdinfo->ct_add_table_genid_hash); + if (thdinfo->ct_add_table_genid_pool) { + pool_free(thdinfo->ct_add_table_genid_pool); + } + delete_defered_index_tbl(); + + if (pool != NULL) + backend_thread_event(thedb, COMDB2_THR_EVENT_DONE_RDWR); +} + +static void handle_buf_thd_dequeue(struct thdpool *pool, struct workitem *item, int timeout) +{ + if (gbl_handle_buf_add_latency_ms > 0) + poll(0, 0, rand() % gbl_handle_buf_add_latency_ms); + + time_metric_add(thedb->handle_buf_queue_time, comdb2_time_epochms() - item->queue_time_ms); + int nqueued = THDPOOLS_COUNTER(queue_depth) + thdpool_get_queue_depth(get_default_sql_pool(0)); + time_metric_add(thedb->queue_depth, nqueued); +} + +static struct thdpool *create_handle_buf_thdpool(const char *name, int nthds) +{ + struct thdpool *pool = thdpool_create(name, sizeof(struct thd)); + if (pool != NULL) { + thdpool_set_stack_size(pool, 4 * 1024 * 1024); + thdpool_set_init_fn(pool, handle_buf_thd_start); + thdpool_set_delt_fn(pool, handle_buf_thd_stop); + thdpool_set_dque_fn(pool, handle_buf_thd_dequeue); + thdpool_set_minthds(pool, 0); + thdpool_set_maxthds(pool, nthds); + thdpool_set_linger(pool, gbl_thd_linger); + thdpool_set_maxqueue(pool, gbl_maxqueue); + } + return pool; +} + +int gbl_queue_use_dedicated_writers = 1; +int gbl_queue_max_dedicated_writers = 16; +void handle_buf_set_queue_thdpool_maxthds(int max) +{ + if (max > thdpool_get_maxthds(gbl_handle_buf_queue_thdpool) && max <= gbl_queue_max_dedicated_writers) + thdpool_set_maxthds(gbl_handle_buf_queue_thdpool, max); +} + int handle_buf_main2(struct dbenv *dbenv, SBUF2 *sb, const uint8_t *p_buf, const uint8_t *p_buf_end, int debug, char *frommach, int frompid, char *fromtask, osql_sess_t *sorese, @@ -1056,12 +777,9 @@ int handle_buf_main2(struct dbenv *dbenv, SBUF2 *sb, const uint8_t *p_buf, int comdbg_flags, void (*iq_setup_func)(struct ireq*, void *setup_data), void *setup_data, int doinline, void* authdata) { + struct thdpool *pool = NULL; struct ireq *iq = NULL; - int rc, num, ndispatch, iamwriter = 0; - int add_latency = gbl_handle_buf_add_latency_ms; - struct thd *thd; - int numwriterthreads; - struct dbq_entry_t *newent = NULL; + int rc; if (db_is_exiting()) { return reterr(curswap, 0, NULL, ERR_REJECTED); @@ -1072,266 +790,55 @@ int handle_buf_main2(struct dbenv *dbenv, SBUF2 *sb, const uint8_t *p_buf, net_delay(frommach); - ndispatch = 0; - nreqs++; - if (gbl_who > 0) { --gbl_who; debug = 1; } + iq = malloc(sizeof(struct ireq)); + if (!iq) { + logmsg(LOGMSG_ERROR, "handle_buf:failed allocate req\n"); + return reterr(curswap, 0, iq, ERR_INTERNAL); + } -#if 0 - fprintf(stderr, "%s:%d: THD=%p getablk iq=%p\n", __func__, __LINE__, pthread_self(), iq); -#endif - - /* allocate a request for later dispatch to available thread */ - LOCK(&lock) - { - iq = (struct ireq *)pool_getablk(p_reqs); - } - UNLOCK(&lock); - if (!iq) { - logmsg(LOGMSG_ERROR, "handle_buf:failed allocate req\n"); - return reterr(curswap, 0, iq, ERR_INTERNAL); - } - - rc = init_ireq_legacy(dbenv, iq, sb, (uint8_t *)p_buf, p_buf_end, debug, - frommach, frompid, fromtask, sorese, qtype, - data_hndl, luxref, rqid, p_sinfo, curswap, comdbg_flags); - if (rc) { - logmsg(LOGMSG_ERROR, "handle_buf:failed to unpack req header\n"); - return reterr(curswap, /*thd*/ 0, iq, rc); - } - iq->sorese = sorese; - if (iq_setup_func) - iq_setup_func(iq, setup_data); - - if (iq->comdbg_flags == -1) - iq->comdbg_flags = 0; - - if (p_buf && p_buf[7] == OP_FWD_BLOCK_LE) - iq->comdbg_flags |= COMDBG_FLAG_FROM_LE; - iq->authdata = authdata; - - if (doinline) { - thd_req_inline(iq); - return 0; - } - - - Pthread_mutex_lock(&lock); - { - ++handled_queue; - - /*count queue*/ - num = q_reqs.count; - if (num >= MAXSTAT) - num = MAXSTAT - 1; - bkt_queue[num]++; - - /*while ((idle.top || busy.count < gbl_maxthreads) - * && (iq = queue_next(q_reqs)))*/ - newent = (struct dbq_entry_t *)pool_getablk(pq_reqs); - if (newent == NULL) { - errUNLOCK(&lock); - logmsg(LOGMSG_ERROR, - "handle_buf:failed to alloc new queue entry, rc %d\n", - rc); - return reterr(curswap, /*thd*/ 0, iq, ERR_REJECTED); - } - newent->obj = (void *)iq; - iamwriter = is_req_write(iq->opcode) ? 1 : 0; - newent->queue_time_ms = comdb2_time_epochms(); - if (!iamwriter) { - (void)listc_abl(&rq_reqs, newent); - } - - /*add to global queue*/ - (void)listc_abl(&q_reqs, newent); - /* dispatch work ...*/ - iq->where = "enqueued"; - - while (busy.count - iothreads < gbl_maxthreads) { - struct dbq_entry_t *nextrq = NULL; - nextrq = (struct dbq_entry_t *)listc_rtl(&q_reqs); - if (nextrq == NULL) - break; - iq = nextrq->obj; - iamwriter = is_req_write(iq->opcode) ? 1 : 0; - - numwriterthreads = gbl_maxwthreads - gbl_maxwthreadpenalty; - if (numwriterthreads < 1) - numwriterthreads = 1; - - if (iamwriter && - (write_thd_count - iothreads) >= numwriterthreads) { - /* i am invalid writer, check the read queue instead */ - listc_atl(&q_reqs, nextrq); - - nextrq = (struct dbq_entry_t *)listc_rtl(&rq_reqs); - if (nextrq == NULL) - break; - iq = nextrq->obj; - /* remove from global list, and release link block of reader*/ - listc_rfl(&q_reqs, nextrq); - if (add_latency > 0) { - poll(0, 0, rand() % add_latency); - } - time_metric_add(thedb->handle_buf_queue_time, comdb2_time_epochms() - nextrq->queue_time_ms); - time_metric_add(thedb->queue_depth, - q_reqs.count + thdpool_get_queue_depth(get_default_sql_pool(0))); - pool_relablk(pq_reqs, nextrq); - if (!iq) - /* this should never be hit */ - break; - /* make sure to mark the reader request accordingly */ - iamwriter = 0; - } else { - /* i am reader or valid writer */ - if (!iamwriter) { - /* remove reader from read queue */ - listc_rfl(&rq_reqs, nextrq); - } - if (add_latency > 0) { - poll(0, 0, rand() % add_latency); - } - time_metric_add(thedb->handle_buf_queue_time, comdb2_time_epochms() - nextrq->queue_time_ms); - time_metric_add(thedb->queue_depth, - q_reqs.count + thdpool_get_queue_depth(get_default_sql_pool(0))); - /* release link block */ - pool_relablk(pq_reqs, nextrq); - if (!iq) { - /* this should never be hit */ - abort(); - break; - } - } - if ((thd = listc_rtl(&idle)) != NULL) /*try to find an idle thread*/ - { -#if 0 - printf("%s:%d: thdpool FOUND THD=%p -> newTHD=%d iq=%p\n", __func__, __LINE__, pthread_self(), thd->tid, iq); -#endif - thd->iq = iq; - thd->inited = 0; - iq->where = "dispatched"; - num = busy.count; - listc_abl(&busy, thd); - if (iamwriter) { - write_thd_count++; - } - if (num >= MAXSTAT) - num = MAXSTAT - 1; - bkt_thd[num]++; /*count threads*/ - Pthread_cond_signal(&thd->wakeup); - ndispatch++; - } else /*i can create one..*/ - { - thd = (struct thd *)pool_getzblk(p_thds); - if (thd == 0) { - rc = errno; - errUNLOCK(&lock); - logmsg(LOGMSG_ERROR, "handle_buf:failed calloc thread:%s\n", - strerror(errno)); - return reterr(curswap, /*thd*/ 0, iq, ERR_INTERNAL); - } - /*add holder for this one being born...*/ - num = busy.count; - listc_abl(&busy, thd); - if (iamwriter) { - write_thd_count++; - } - thd->iq = iq; - /* fprintf(stderr, "added3 %8.8x\n",thd);*/ - iq->where = "dispatched new"; - Pthread_cond_init(&thd->wakeup, 0); - nthdcreates++; -#ifdef MONITOR_STACK - rc = comdb2_pthread_create(&thd->tid, &attr, thd_req, - (void *)thd, stack_alloc, stack_sz); -#else - rc = pthread_create(&thd->tid, &attr, thd_req, (void *)thd); -#endif - -#if 0 - printf("%s:%d: thdpool CREATE THD=%p -> newTHD=%d iq=%p\n", __func__, __LINE__, pthread_self(), thd->tid, iq); -#endif - if (rc != 0) { - errUNLOCK(&lock); - perror_errnum("handle_buf:failed pthread_thread_start", rc); - /* This tends to happen when we're out of memory. Rather - * than limp onwards, we should just exit here. Hand off - * masterness if possible. */ - if (debug_exit_on_pthread_create_error()) { - bdb_transfermaster(thedb->static_table.handle); - logmsg(LOGMSG_FATAL, - "%s:Exiting due to thread create errors\n", - __func__); - exit(1); - } - return reterr(curswap, thd, iq, ERR_INTERNAL); - } - /* added thread to thread pool.*/ - if (num >= MAXSTAT) - num = MAXSTAT - 1; - bkt_thd[num]++; /*count threads*/ - ndispatch++; - } - comdb2bma_transfer_priority(blobmem, thd->tid); - } + rc = init_ireq_legacy(dbenv, iq, sb, (uint8_t *)p_buf, p_buf_end, debug, + frommach, frompid, fromtask, sorese, qtype, + data_hndl, luxref, rqid, p_sinfo, curswap, comdbg_flags); + if (rc) { + logmsg(LOGMSG_ERROR, "handle_buf:failed to unpack req header\n"); + return reterr(curswap, /*thd*/ 0, iq, rc); + } + iq->sorese = sorese; + if (iq_setup_func) + iq_setup_func(iq, setup_data); - /* drain queue if too full */ - rc = q_reqs.count; - if (qtype != REQ_OFFLOAD && rc > gbl_maxqueue) { - struct dbq_entry_t *nextrq = NULL; - logmsg(LOGMSG_ERROR, - "THD=%p handle_buf:rejecting requests queue too full %d " - "(max %d)\n", - (void *)pthread_self(), rc, gbl_maxqueue); - - comdb2bma_yield_all(); - /* Dequeue the request I just queued. */ - nextrq = (struct dbq_entry_t *)listc_rbl(&q_reqs); - if (nextrq && nextrq == newent) { - iq = nextrq->obj; - iamwriter = is_req_write(iq->opcode) ? 1 : 0; - if (!iamwriter) { - listc_rfl(&rq_reqs, nextrq); - } - pool_relablk(pq_reqs, nextrq); - Pthread_mutex_unlock(&lock); - nqfulls++; - reterr_withfree(iq, ERR_REJECTED); - } else { - /* THIS can happen since the queue might be already full, - with requests we keep, and this could be a successfully - dispatched request (which is not at the head of the list - anymore). - If it is not me, stay in queue */ - listc_abl(&q_reqs, nextrq); - - iq = nextrq->obj; -#if 0 - fprintf(stderr, "SKIP DISCARDING iq=%p\n", iq); -#endif + if (iq->comdbg_flags == -1) + iq->comdbg_flags = 0; - /* paranoia; this cannot be read */ - iamwriter = is_req_write(iq->opcode) ? 1 : 0; - if (!iamwriter) { - /* this should not be a read, unless code changed; reads are - not kept in the queue above the limit */ - abort(); - } + if (p_buf && p_buf[7] == OP_FWD_BLOCK_LE) + iq->comdbg_flags |= COMDBG_FLAG_FROM_LE; + iq->authdata = authdata; - Pthread_mutex_unlock(&lock); - } - } else { - Pthread_mutex_unlock(&lock); - } + if (doinline) { + thd_req_inline(iq); + return 0; } - if (ndispatch == 0) - nwaits++; + if (!is_req_write(iq->opcode)) { + pool = gbl_handle_buf_read_thdpool; + } else if (gbl_queue_use_dedicated_writers && (sorese != NULL) && sorese->is_qconsume_only == 1) { + pool = gbl_handle_buf_queue_thdpool; + } else { + pool = gbl_handle_buf_write_thdpool; + int n = gbl_maxwthreads - gbl_maxwthreadpenalty; + thdpool_set_maxthds(pool, n); + } + rc = thdpool_enqueue(pool, handle_buf_work_pp, iq, 1, NULL, (qtype == REQ_OFFLOAD) ? THDPOOL_FORCE_QUEUE : 0); + if (rc != 0) { + logmsg(LOGMSG_ERROR, "handle_buf:rejecting requests queue too full %d (max %d)\n", + thdpool_get_queue_depth(pool), thdpool_get_maxqueue(pool)); + reterr_withfree(iq, ERR_REJECTED); + } return 0; } @@ -1346,12 +853,6 @@ int handle_buf_main(struct dbenv *dbenv, SBUF2 *sb, const uint8_t *p_buf, rqid, 0, 0, 0, iq_setup_func, NULL, 0, NULL); } -void destroy_ireq(struct dbenv *dbenv, struct ireq *iq) -{ - LOCK(&lock) { pool_relablk(p_reqs, iq); } - UNLOCK(&lock); -} - static int is_req_write(int opcode) { if (opcode == OP_FWD_LBLOCK || opcode == OP_BLOCK || diff --git a/db/osqlblockproc.c b/db/osqlblockproc.c index 41dfa99581..e7fb688c51 100644 --- a/db/osqlblockproc.c +++ b/db/osqlblockproc.c @@ -1619,3 +1619,9 @@ int resume_sc_multiddl_txn(sc_list_t *scl) } return 0; } + + +const char *osql_last_usedb_tablename(osql_sess_t *sess) +{ + return (sess->tran != NULL) ? sess->tran->tablename : NULL; +} diff --git a/db/osqlblockproc.h b/db/osqlblockproc.h index fae7ad9e7d..2363ccbb99 100644 --- a/db/osqlblockproc.h +++ b/db/osqlblockproc.h @@ -115,4 +115,6 @@ void osql_bplog_set_blkseq(osql_sess_t *sess, struct ireq *iq); */ void osql_bplog_time_done(osql_bp_timings_t *tms); +/* Returns the tablename of the last usedb opcode */ +const char *osql_last_usedb_tablename(osql_sess_t *sess); #endif diff --git a/db/osqlcomm.c b/db/osqlcomm.c index dd246b2ae3..34da0a7a14 100644 --- a/db/osqlcomm.c +++ b/db/osqlcomm.c @@ -3680,7 +3680,7 @@ static void net_snap_uid_rpl(void *hndl, void *uptr, char *fromhost, } int gbl_disable_cnonce_blkseq; - +static void qconsume_check(osql_sess_t *sess, int type); /** * If "rpl" is a done packet, set xerr to error if any and return 1 * If "rpl" is a recognizable packet, returns the length of the data type is @@ -3737,6 +3737,7 @@ int osql_comm_is_done(osql_sess_t *sess, int type, char *rpl, int rpllen, sess->is_delayed = 1; break; } + qconsume_check(sess, type); return rc; } @@ -6230,6 +6231,22 @@ static inline int is_write_request(int type) } } +static void qconsume_check(osql_sess_t *sess, int type) +{ + const char *tblname; + if (sess == NULL || !is_write_request(type) || sess->is_qconsume_only == 0) + return; + + tblname = osql_last_usedb_tablename(sess); + + if (type == OSQL_DBQ_CONSUME) + sess->is_qconsume_only = 1; + else if ((type == OSQL_DELREC || type == OSQL_DELETE) && tblname != NULL && is_tablename_queue(tblname)) + sess->is_qconsume_only = 1; + else + sess->is_qconsume_only = 0; +} + void free_cached_idx(uint8_t **cached_idx); int gbl_disable_tpsc_tblvers = 0; diff --git a/db/osqlsession.c b/db/osqlsession.c index 8798073a48..fbf9bf9d4d 100644 --- a/db/osqlsession.c +++ b/db/osqlsession.c @@ -699,6 +699,7 @@ static osql_sess_t *_osql_sess_create(osql_sess_t *sess, char *tzname, int type, listc_init(&sess->participants, offsetof(struct participant, linkv)); sess->impl->clients = 1; + sess->is_qconsume_only = -1; /* unknown */ /* defaults to net */ init_bplog_net(&sess->target); diff --git a/db/process_message.c b/db/process_message.c index 867ae3c4ec..2c35ff4b02 100644 --- a/db/process_message.c +++ b/db/process_message.c @@ -3703,6 +3703,12 @@ int process_command(struct dbenv *dbenv, char *line, int lline, int st) thdpool_process_message(gbl_verify_thdpool, line, lline, st); else logmsg(LOGMSG_WARN, "verifypool is not initialized\n"); + } else if (tokcmp(tok, ltok, "handle_buf_read") == 0) { + thdpool_process_message(gbl_handle_buf_read_thdpool, line, lline, st); + } else if (tokcmp(tok, ltok, "handle_buf_write") == 0) { + thdpool_process_message(gbl_handle_buf_write_thdpool, line, lline, st); + } else if (tokcmp(tok, ltok, "handle_buf_queue") == 0) { + thdpool_process_message(gbl_handle_buf_queue_thdpool, line, lline, st); } else if (tokcmp(tok, ltok, "disttxn") == 0) { char dist_txnid[128] = {0}; int found = 0; diff --git a/schemachange/sc_queues.c b/schemachange/sc_queues.c index e50b143c56..5bb39cac04 100644 --- a/schemachange/sc_queues.c +++ b/schemachange/sc_queues.c @@ -103,14 +103,14 @@ int do_alter_queues_int(struct schema_change_type *sc) return rc; } +void handle_buf_set_queue_thdpool_maxthds(int nthds); void static add_to_qdbs(struct dbtable *db) { - thedb->qdbs = - realloc(thedb->qdbs, (thedb->num_qdbs + 1) * sizeof(struct dbtable *)); + thedb->qdbs = realloc(thedb->qdbs, (thedb->num_qdbs + 1) * sizeof(struct dbtable *)); thedb->qdbs[thedb->num_qdbs++] = db; - /* Add queue to the hash. */ hash_add(thedb->qdb_hash, db); + handle_buf_set_queue_thdpool_maxthds(thedb->num_qdbs); } int static remove_from_qdbs(struct dbtable *db) diff --git a/tests/comdb2sys.test/comdb2sys.expected b/tests/comdb2sys.test/comdb2sys.expected index 710f156448..a33dbc6728 100644 --- a/tests/comdb2sys.test/comdb2sys.expected +++ b/tests/comdb2sys.test/comdb2sys.expected @@ -342,6 +342,9 @@ (name='max_views', description='Maximum number of views', value=1024) [SELECT * FROM comdb2_limits ORDER BY name] rc 0 (name='appsockpool') +(name='handle_buf_queue') +(name='handle_buf_read') +(name='handle_buf_write') (name='loadcache') (name='memptrickle') (name='osqlpfaultpool') diff --git a/tests/qthdpool.test/Makefile b/tests/qthdpool.test/Makefile new file mode 100644 index 0000000000..e05866ba3a --- /dev/null +++ b/tests/qthdpool.test/Makefile @@ -0,0 +1,8 @@ +ifeq ($(TESTSROOTDIR),) + include ../testcase.mk +else + include $(TESTSROOTDIR)/testcase.mk +endif +ifeq ($(TEST_TIMEOUT),) + export TEST_TIMEOUT=1m +endif diff --git a/tests/qthdpool.test/expected b/tests/qthdpool.test/expected new file mode 100644 index 0000000000..6b184f3953 --- /dev/null +++ b/tests/qthdpool.test/expected @@ -0,0 +1,9 @@ +t 1 +t 2 +t 3 +audit 2 +audit 3 +audit 1 +1 +2 +3 diff --git a/tests/qthdpool.test/runit b/tests/qthdpool.test/runit new file mode 100755 index 0000000000..44fc8ec9b6 --- /dev/null +++ b/tests/qthdpool.test/runit @@ -0,0 +1,55 @@ +#!/usr/bin/env bash +bash -n "$0" | exit 1 +set -e + +dbnm=$1 + +master=`cdb2sql --tabs ${CDB2_OPTIONS} $dbnm default 'SELECT host FROM comdb2_cluster WHERE is_master="Y"'` + +cdb2sql ${CDB2_OPTIONS} -s --tabs $dbnm default - >output.actual 2>&1 <qbefore +cdb2sql ${CDB2_OPTIONS} $dbnm default "EXEC PROCEDURE auditor()" # consume 1 +cdb2sql ${CDB2_OPTIONS} $dbnm default "EXEC PROCEDURE auditor()" # consume 2 +cdb2sql ${CDB2_OPTIONS} $dbnm default "EXEC PROCEDURE auditor()" # consume 3 +cdb2sql --tabs ${CDB2_OPTIONS} $dbnm default "SELECT 't', i FROM t" >>actual +cdb2sql --tabs ${CDB2_OPTIONS} $dbnm default "SELECT 'audit', i FROM audit" >>actual +cdb2sql --tabs $dbnm --host $master "EXEC PROCEDURE sys.cmd.send('handle_buf_queue stat')" >qafter +diff qbefore qafter + +# the sp only consumes. Verify that it's handled by the queue pool +cdb2sql --tabs $dbnm --host $master "EXEC PROCEDURE sys.cmd.send('handle_buf_write stat')" >wbefore +cdb2sql --tabs ${CDB2_OPTIONS} $dbnm default "EXEC PROCEDURE watcher()" >>actual # consume 1 +cdb2sql --tabs ${CDB2_OPTIONS} $dbnm default "EXEC PROCEDURE watcher()" >>actual # consume 2 +cdb2sql --tabs ${CDB2_OPTIONS} $dbnm default "EXEC PROCEDURE watcher()" >>actual # consume 3 +cdb2sql --tabs $dbnm --host $master "EXEC PROCEDURE sys.cmd.send('handle_buf_write stat')" >wafter +diff wbefore wafter + +# We consumed 3 items off the queue. Verify that. +cdb2sql --tabs $dbnm --host $master "EXEC PROCEDURE sys.cmd.send('handle_buf_queue stat')" | grep "Work items done immediate : 3" +diff actual expected diff --git a/tests/tunables.test/t00_all_tunables.expected b/tests/tunables.test/t00_all_tunables.expected index cd52f8854e..33f8b3dc0a 100644 --- a/tests/tunables.test/t00_all_tunables.expected +++ b/tests/tunables.test/t00_all_tunables.expected @@ -404,6 +404,36 @@ (name='gofast', description='', type='BOOLEAN', value='ON', read_only='N') (name='goslow', description='', type='BOOLEAN', value='OFF', read_only='N') (name='group_concat_memory_limit', description='Restrict GROUP_CONCAT from using more than this amount of memory; 0 implies SQLITE_MAX_LENGTH, the limit imposed by sqlite. (Default: 0)', type='INTEGER', value='0', read_only='Y') +(name='handle_buf_queue.dump_on_full', description='Dump status on full queue.', type='BOOLEAN', value='OFF', read_only='N') +(name='handle_buf_queue.exit_on_error', description='Exit on pthread error.', type='BOOLEAN', value='ON', read_only='N') +(name='handle_buf_queue.linger', description='Thread linger time (in seconds).', type='INTEGER', value='5', read_only='N') +(name='handle_buf_queue.longwait', description='Long wait alarm threshold (in milliseconds).', type='INTEGER', value='500', read_only='N') +(name='handle_buf_queue.maxagems', description='Maximum age for in-queue time (in milliseconds).', type='INTEGER', value='0', read_only='N') +(name='handle_buf_queue.maxq', description='Maximum size of queue.', type='INTEGER', value='192', read_only='N') +(name='handle_buf_queue.maxqover', description='Maximum client forced queued items above maxq.', type='INTEGER', value='0', read_only='N') +(name='handle_buf_queue.maxt', description='Maximum number of threads in the pool.', type='INTEGER', value='8', read_only='N') +(name='handle_buf_queue.mint', description='Minimum number of threads in the pool.', type='INTEGER', value='0', read_only='N') +(name='handle_buf_queue.stacksz', description='Thread stack size.', type='INTEGER', value='4194304', read_only='N') +(name='handle_buf_read.dump_on_full', description='Dump status on full queue.', type='BOOLEAN', value='OFF', read_only='N') +(name='handle_buf_read.exit_on_error', description='Exit on pthread error.', type='BOOLEAN', value='ON', read_only='N') +(name='handle_buf_read.linger', description='Thread linger time (in seconds).', type='INTEGER', value='5', read_only='N') +(name='handle_buf_read.longwait', description='Long wait alarm threshold (in milliseconds).', type='INTEGER', value='500', read_only='N') +(name='handle_buf_read.maxagems', description='Maximum age for in-queue time (in milliseconds).', type='INTEGER', value='0', read_only='N') +(name='handle_buf_read.maxq', description='Maximum size of queue.', type='INTEGER', value='192', read_only='N') +(name='handle_buf_read.maxqover', description='Maximum client forced queued items above maxq.', type='INTEGER', value='0', read_only='N') +(name='handle_buf_read.maxt', description='Maximum number of threads in the pool.', type='INTEGER', value='48', read_only='N') +(name='handle_buf_read.mint', description='Minimum number of threads in the pool.', type='INTEGER', value='0', read_only='N') +(name='handle_buf_read.stacksz', description='Thread stack size.', type='INTEGER', value='4194304', read_only='N') +(name='handle_buf_write.dump_on_full', description='Dump status on full queue.', type='BOOLEAN', value='OFF', read_only='N') +(name='handle_buf_write.exit_on_error', description='Exit on pthread error.', type='BOOLEAN', value='ON', read_only='N') +(name='handle_buf_write.linger', description='Thread linger time (in seconds).', type='INTEGER', value='5', read_only='N') +(name='handle_buf_write.longwait', description='Long wait alarm threshold (in milliseconds).', type='INTEGER', value='500', read_only='N') +(name='handle_buf_write.maxagems', description='Maximum age for in-queue time (in milliseconds).', type='INTEGER', value='0', read_only='N') +(name='handle_buf_write.maxq', description='Maximum size of queue.', type='INTEGER', value='192', read_only='N') +(name='handle_buf_write.maxqover', description='Maximum client forced queued items above maxq.', type='INTEGER', value='0', read_only='N') +(name='handle_buf_write.maxt', description='Maximum number of threads in the pool.', type='INTEGER', value='8', read_only='N') +(name='handle_buf_write.mint', description='Minimum number of threads in the pool.', type='INTEGER', value='0', read_only='N') +(name='handle_buf_write.stacksz', description='Thread stack size.', type='INTEGER', value='4194304', read_only='N') (name='heartbeat_check_time', description='Raise an error if no heartbeat for this amount of time (in secs). (Default: 5 secs)', type='INTEGER', value='5', read_only='Y') (name='hostile_takeover_retries', description='Attempt to take over mastership if the master machine is marked offline, and the current machine is online.', type='INTEGER', value='0', read_only='N') (name='hostname', description='', type='STRING', value='***', read_only='Y') @@ -761,6 +791,8 @@ (name='qscanmode', description='Enables queue scan mode optimisation.', type='BOOLEAN', value='OFF', read_only='N') (name='query_plan_percentage', description='Alarm if the average cost per row of current query plan is n percent above the cost for different query plan. (Default: 50)', type='DOUBLE', value='50', read_only='N') (name='query_plans', description='Keep track of query plans and their costs for each query', type='BOOLEAN', value='ON', read_only='N') +(name='queue_max_dedicated_writers', description='Max number of dedicated queue-consume writers. (Default: on)', type='INTEGER', value='16', read_only='N') +(name='queue_use_dedicated_writers', description='Whether queue-consumes are processed in dedicated writers. (Default: on)', type='BOOLEAN', value='ON', read_only='N') (name='queuedb_file_interval', description='Check on this interval each queuedb against its configured maximum file size. (Default: 60000ms)', type='INTEGER', value='60000', read_only='Y') (name='queuedb_file_threshold', description='Maximum queuedb file size (in MB) before enqueueing to the alternate file. (Default: 0)', type='INTEGER', value='0', read_only='Y') (name='queuedb_genid_filename', description='Use genid in queuedb filenames. (Default: on)', type='BOOLEAN', value='ON', read_only='Y') diff --git a/util/thdpool.c b/util/thdpool.c index 647dd34fd5..1ec52a0a42 100644 --- a/util/thdpool.c +++ b/util/thdpool.c @@ -63,6 +63,7 @@ struct thd { pthread_t tid; arch_tid archtid; struct thdpool *pool; + void *thddata; /* Work item that we need to do. */ struct workitem work; @@ -347,6 +348,18 @@ void thdpool_foreach(struct thdpool *pool, thdpool_foreach_fn foreach_fn, UNLOCK(&pool->mutex); } +void thdpool_for_each_thd(struct thdpool *pool, thdpool_for_each_thd_fn for_each_thd_fn, void *user) +{ + LOCK(&pool->mutex) + { + struct thd *thd; + LISTC_FOR_EACH(&pool->thdlist, thd, thdlist_linkv) { + (for_each_thd_fn)(pool, thd->tid, thd->on_freelist, thd->thddata, user); + } + } + UNLOCK(&pool->mutex); +} + void thdpool_unset_exit(struct thdpool *pool) { pool->exit_on_create_fail = 0; } void thdpool_set_linger(struct thdpool *pool, unsigned lingersecs) @@ -704,6 +717,7 @@ static void *thdpool_thd(void *voidarg) thddata = alloca(pool->per_thread_data_sz); assert(thddata != NULL); memset(thddata, 0, pool->per_thread_data_sz); + thd->thddata = thddata; } init_fn = pool->init_fn;