diff --git a/src/odbc/odbc.c b/src/odbc/odbc.c index 89a950e04..78bc4b456 100644 --- a/src/odbc/odbc.c +++ b/src/odbc/odbc.c @@ -872,7 +872,13 @@ odbc_lock_statement(TDS_STMT* stmt) /* try with MARS */ if (!tds) + { tds = tds_alloc_additional_socket(dbc_tds->conn); + if (tds) + { + tdsdump_log(TDS_DBG_INFO1, "MARS SID %d allocated new TDSSOCKET\n", tds->sid); + } + } } if (tds) { tds->query_timeout = (stmt->attr.query_timeout != DEFAULT_QUERY_TIMEOUT) ? @@ -922,15 +928,8 @@ odbc_unlock_statement(TDS_STMT* stmt) tds_set_parent(tds, stmt->dbc); stmt->tds = NULL; } -#if ENABLE_ODBC_MARS - } else if (tds) { - if (tds->state == TDS_IDLE || tds->state == TDS_DEAD) { - assert(tds != stmt->dbc->tds_socket); - tds_free_socket(tds); - stmt->tds = NULL; - } -#endif } + /* NOTE: MARS socket now released when statement freed. */ tds_mutex_unlock(&stmt->dbc->mtx); } @@ -4464,6 +4463,17 @@ odbc_SQLFreeStmt(SQLHSTMT hstmt, SQLUSMALLINT fOption, int force) tds_free_param_results(stmt->params); odbc_errs_reset(&stmt->errs); odbc_unlock_statement(stmt); +#if ENABLE_ODBC_MARS + if ( stmt->tds && stmt->tds != stmt->dbc->tds_socket ) + { + if (!(tds->state == TDS_IDLE || tds->state == TDS_DEAD)) { + tdsdump_log(TDS_DBG_WARN, "MARS SID %d was not idle/dead\n", tds->sid); + } + tdsdump_log(TDS_DBG_INFO1, "MARS SID %d socket freeing\n", tds->sid); + tds_free_socket(tds); + stmt->tds = NULL; + } +#endif tds_dstr_free(&stmt->cursor_name); tds_dstr_free(&stmt->attr.qn_msgtext); tds_dstr_free(&stmt->attr.qn_options); diff --git a/src/odbc/unittests/common.c b/src/odbc/unittests/common.c index 093a01044..c9591466b 100644 --- a/src/odbc/unittests/common.c +++ b/src/odbc/unittests/common.c @@ -632,7 +632,11 @@ odbc_command_proc(HSTMT stmt, const char *command, const char *file, int line, c SQLRETURN ret; ODBC_BUF *odbc_buf = NULL; - printf("%s\n", command); + if (command[0] == '!') + ++command; + else + printf("%s\n", command); + ret = odbc_check_res(file, line, SQLExecDirect(stmt, T(command), SQL_NTS), SQL_HANDLE_STMT, stmt, "odbc_command", res); ODBC_FREE(); return ret; diff --git a/src/odbc/unittests/mars1.c b/src/odbc/unittests/mars1.c index e1f5982cf..d17465cf6 100644 --- a/src/odbc/unittests/mars1.c +++ b/src/odbc/unittests/mars1.c @@ -1,7 +1,55 @@ + #include "common.h" -/* first MARS test, test 2 concurrent recordset */ +/* + * Memory leak tracking apparatus + */ +#include +#include +#ifdef __VMS +#define __NEW_STARLET +#include +#include +#include +#include +#endif + +static size_t +memory_usage(void) +{ + size_t ret = 0; +#if defined(HAVE__HEAPWALK) + _HEAPINFO hinfo; + int heapstatus; + + hinfo._pentry = NULL; + while ((heapstatus = _heapwalk(&hinfo)) == _HEAPOK) { + if (hinfo._useflag == _USEDENTRY) + ret += hinfo._size; + } + assert(heapstatus == _HEAPEMPTY || heapstatus == _HEAPEND); + +#elif defined(HAVE_MALLINFO2) + ret = mallinfo2().uordblks; + +#elif defined(__VMS) + ILE3 jpi_items[2] = { 0 }; + unsigned long ppgcnt; + unsigned short ppgcnt_len; + jpi_items[0].ile3$w_length = sizeof(ppgcnt); + jpi_items[0].ile3$w_code = JPI$_PPGCNT; + jpi_items[0].ile3$ps_bufaddr = &ppgcnt; + jpi_items[0].ile3$ps_retlen_addr = &ppgcnt_len; + int status = SYS$GETJPIW(0, 0, 0, &jpi_items, 0, 0, 0); + ret = $VMS_STATUS_SUCCESS(status) ? ppgcnt : SIZE_MAX; +#else + ret = (size_t)(mallinfo().uordblks); + +#endif + return ret; +} +/* first MARS test, test 2 concurrent recordset */ #define SET_STMT(n) do { \ if (pcur_stmt != &n) { \ if (pcur_stmt) *pcur_stmt = odbc_stmt; \ @@ -22,7 +70,6 @@ EndTransaction(SQLSMALLINT type) CHKEndTran(SQL_HANDLE_DBC, odbc_conn, type, "S"); } - static void my_attrs(void) { @@ -32,9 +79,11 @@ my_attrs(void) TEST_MAIN() { SQLINTEGER len, out; - int i; + int i, j; SQLHSTMT stmt1, stmt2; SQLHSTMT *pcur_stmt = NULL; + long bind1; + char bind2[20] = "parameters"; odbc_use_version3 = true; odbc_set_conn_attr = my_attrs; @@ -77,18 +126,63 @@ TEST_MAIN() AutoCommit(SQL_AUTOCOMMIT_OFF); /* try to do a select which return a lot of data (to test server didn't cache everything) */ - odbc_command("select a.n, b.n, a.v from #mars1 a, #mars1 b order by a.n, b.n"); + odbc_command("select a.n, b.n, c.n, a.v from #mars1 a, #mars1 b, #mars1 c order by a.n, b.n, c.n"); CHKFetch("S"); - /* try to do other commands */ CHKAllocStmt(&stmt2, "S"); - SET_STMT(stmt2); - odbc_command("insert into #mars2 values(1, 'foo')"); - SET_STMT(stmt1); - - CHKFetch("S"); + /* Use a parameterized insert. This causes DONEINPROC to be returned by SQL Server, + * leading to result_type==TDS_CMD_DONE when it's complete. Without the parameter, + * result_type == TDS_DONE_RESULT. + * And in odbc_SQLExecute(), it calls odbc_unlock_statement() for TDS_CMD_DONE, but + * not for TDS_DONE_RESULT. (We don't know why...) + * This means that the stmt->tds TDSSOCKET struct is completely freed after every + * iteration of the insert if and only if it was a parameterized insert. So we need + * to test both parameterized and non-parameterized inserts. + */ + SQLBindParameter(stmt2, 1, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_INTEGER, 0, 0, &bind1, 0, NULL); + SQLBindParameter(stmt2, 2, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_CHAR, 0, 0, &bind2, 20, NULL); + + /* adjust these parameters for memory leak testing */ + /* TODO a way for this test to detect memory leak here. */ + const int n_iterations = 20000; /* E.g. 200000 */ + const int freq_parameterized = 2; /* set 1 to parameterize all, INT_MAX for none */ + + size_t memory_usage_watermark = 0; + + for (i= 1; i <= n_iterations; ++i) + { + SET_STMT(stmt2); + + // Test option - force reallocation of socket + // odbc_reset_statement(); + + if (i % freq_parameterized == 0) + { + bind1 = i; + odbc_command("!insert into #mars2 values(?, ?)"); + } + else + odbc_command("!insert into #mars2 values(1, 'foo')"); + + size_t newmu = memory_usage(); + if (newmu > memory_usage_watermark) + { + printf("Memory usage increased to %lu on iteration %d\n", (unsigned long)newmu, i); + memory_usage_watermark = newmu; + } + // Perform several fetches for each insert, so we also test continuing to draw + // further packets of the fetch + SET_STMT(stmt1); + for (j = 0; j < 10; ++j) + { + CHKFetch("S"); + } + } + printf("Performed %d inserts while fetching.\n", i - 1); + /* reset statements */ + SET_STMT(stmt1); odbc_reset_statement(); SET_STMT(stmt2); odbc_reset_statement(); @@ -114,8 +208,6 @@ TEST_MAIN() EndTransaction(SQL_COMMIT); - /* TODO test receiving large data should not take much memory */ - odbc_disconnect(); return 0; } diff --git a/src/tds/packet.c b/src/tds/packet.c index 3ec70b562..eee043b79 100644 --- a/src/tds/packet.c +++ b/src/tds/packet.c @@ -521,6 +521,7 @@ int tds_read_packet(TDSSOCKET * tds) { #if ENABLE_ODBC_MARS + TDS_UINT this_seq; TDSCONNECTION *conn = tds->conn; tds_mutex_lock(&conn->list_mtx); @@ -554,13 +555,20 @@ tds_read_packet(TDSSOCKET * tds) tds->in_pos = 8; tds->in_flag = tds->in_buf[0]; - /* send acknowledge if needed */ - if ((int32_t) (tds->recv_seq + 2 - tds->recv_wnd) >= 0) - tds_update_recv_wnd(tds, tds->recv_seq + 4); - + /* Look ahead by up to 4 packets */ + this_seq = TDS_GET_A4LE(&((const TDS72_SMP_HEADER*)packet->buf)->seq); + if ((int32_t)(this_seq + 2 - tds->recv_wnd) >= 0) + tds_update_recv_wnd(tds, this_seq + 4); return tds->in_len; } +#if ENABLE_EXTRA_CHECKS + { + TDS_UINT np = 0; + for (p_packet = &conn->packets; *p_packet; p_packet = &(*p_packet)->next) ++np; + tdsdump_log(TDS_DBG_NETWORK, "MARS SID %d queued packets %u\n", tds->sid, np); + } +#endif /* network ok ? process network */ if (!conn->in_net_tds) { tds_connection_network(conn, tds, 0);