Skip to content
26 changes: 18 additions & 8 deletions src/odbc/odbc.c
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,13 @@ odbc_lock_statement(TDS_STMT* stmt)

/* try with MARS */
if (!tds)
{
tds = tds_alloc_additional_socket(dbc_tds->conn);
if (tds)
{
tdsdump_log(TDS_DBG_INFO1, "MARS SID %d allocated new TDSSOCKET\n", tds->sid);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe MARS SID %d allocated for new TDSSOCKET\n ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, both of them are allocated really (the SID and the TDSSOCKET); how about with instead of for

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds good to me

}
}
}
if (tds) {
tds->query_timeout = (stmt->attr.query_timeout != DEFAULT_QUERY_TIMEOUT) ?
Expand Down Expand Up @@ -922,15 +928,8 @@ odbc_unlock_statement(TDS_STMT* stmt)
tds_set_parent(tds, stmt->dbc);
stmt->tds = NULL;
}
#if ENABLE_ODBC_MARS
} else if (tds) {
if (tds->state == TDS_IDLE || tds->state == TDS_DEAD) {
assert(tds != stmt->dbc->tds_socket);
tds_free_socket(tds);
stmt->tds = NULL;
}
#endif
}
/* NOTE: MARS socket now released when statement freed. */
tds_mutex_unlock(&stmt->dbc->mtx);
}

Expand Down Expand Up @@ -4464,6 +4463,17 @@ odbc_SQLFreeStmt(SQLHSTMT hstmt, SQLUSMALLINT fOption, int force)
tds_free_param_results(stmt->params);
odbc_errs_reset(&stmt->errs);
odbc_unlock_statement(stmt);
#if ENABLE_ODBC_MARS
if ( stmt->tds && stmt->tds != stmt->dbc->tds_socket )
{
if (!(tds->state == TDS_IDLE || tds->state == TDS_DEAD)) {
tdsdump_log(TDS_DBG_WARN, "MARS SID %d was not idle/dead\n", tds->sid);
}
tdsdump_log(TDS_DBG_INFO1, "MARS SID %d socket freeing\n", tds->sid);
tds_free_socket(tds);
stmt->tds = NULL;
}
#endif
tds_dstr_free(&stmt->cursor_name);
tds_dstr_free(&stmt->attr.qn_msgtext);
tds_dstr_free(&stmt->attr.qn_options);
Expand Down
6 changes: 5 additions & 1 deletion src/odbc/unittests/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,11 @@ odbc_command_proc(HSTMT stmt, const char *command, const char *file, int line, c
SQLRETURN ret;
ODBC_BUF *odbc_buf = NULL;

printf("%s\n", command);
if (command[0] == '!')
++command;
else
printf("%s\n", command);

ret = odbc_check_res(file, line, SQLExecDirect(stmt, T(command), SQL_NTS), SQL_HANDLE_STMT, stmt, "odbc_command", res);
ODBC_FREE();
return ret;
Expand Down
116 changes: 104 additions & 12 deletions src/odbc/unittests/mars1.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,55 @@

#include "common.h"

/* first MARS test, test 2 concurrent recordset */
/*
* Memory leak tracking apparatus
*/
#include <malloc.h>
#include <assert.h>
#ifdef __VMS
#define __NEW_STARLET
#include <starlet.h>
#include <iledef.h>
#include <jpidef.h>
#include <stsdef.h>
#endif

static size_t
memory_usage(void)
{
size_t ret = 0;
#if defined(HAVE__HEAPWALK)
_HEAPINFO hinfo;
int heapstatus;

hinfo._pentry = NULL;
while ((heapstatus = _heapwalk(&hinfo)) == _HEAPOK) {
if (hinfo._useflag == _USEDENTRY)
ret += hinfo._size;
}
assert(heapstatus == _HEAPEMPTY || heapstatus == _HEAPEND);

#elif defined(HAVE_MALLINFO2)
ret = mallinfo2().uordblks;

#elif defined(__VMS)
ILE3 jpi_items[2] = { 0 };
unsigned long ppgcnt;
unsigned short ppgcnt_len;
jpi_items[0].ile3$w_length = sizeof(ppgcnt);
jpi_items[0].ile3$w_code = JPI$_PPGCNT;
jpi_items[0].ile3$ps_bufaddr = &ppgcnt;
jpi_items[0].ile3$ps_retlen_addr = &ppgcnt_len;
int status = SYS$GETJPIW(0, 0, 0, &jpi_items, 0, 0, 0);
ret = $VMS_STATUS_SUCCESS(status) ? ppgcnt : SIZE_MAX;
#else
ret = (size_t)(mallinfo().uordblks);

#endif
return ret;
}

/* first MARS test, test 2 concurrent recordset */
#define SET_STMT(n) do { \
if (pcur_stmt != &n) { \
if (pcur_stmt) *pcur_stmt = odbc_stmt; \
Expand All @@ -22,7 +70,6 @@ EndTransaction(SQLSMALLINT type)
CHKEndTran(SQL_HANDLE_DBC, odbc_conn, type, "S");
}


static void
my_attrs(void)
{
Expand All @@ -32,9 +79,11 @@ my_attrs(void)
TEST_MAIN()
{
SQLINTEGER len, out;
int i;
int i, j;
SQLHSTMT stmt1, stmt2;
SQLHSTMT *pcur_stmt = NULL;
long bind1;
char bind2[20] = "parameters";

odbc_use_version3 = true;
odbc_set_conn_attr = my_attrs;
Expand Down Expand Up @@ -77,18 +126,63 @@ TEST_MAIN()
AutoCommit(SQL_AUTOCOMMIT_OFF);

/* try to do a select which return a lot of data (to test server didn't cache everything) */
odbc_command("select a.n, b.n, a.v from #mars1 a, #mars1 b order by a.n, b.n");
odbc_command("select a.n, b.n, c.n, a.v from #mars1 a, #mars1 b, #mars1 c order by a.n, b.n, c.n");
CHKFetch("S");

/* try to do other commands */
CHKAllocStmt(&stmt2, "S");
SET_STMT(stmt2);
odbc_command("insert into #mars2 values(1, 'foo')");
SET_STMT(stmt1);

CHKFetch("S");

/* Use a parameterized insert. This causes DONEINPROC to be returned by SQL Server,
* leading to result_type==TDS_CMD_DONE when it's complete. Without the parameter,
* result_type == TDS_DONE_RESULT.
* And in odbc_SQLExecute(), it calls odbc_unlock_statement() for TDS_CMD_DONE, but
* not for TDS_DONE_RESULT. (We don't know why...)
* This means that the stmt->tds TDSSOCKET struct is completely freed after every
* iteration of the insert if and only if it was a parameterized insert. So we need
* to test both parameterized and non-parameterized inserts.
*/
SQLBindParameter(stmt2, 1, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_INTEGER, 0, 0, &bind1, 0, NULL);
SQLBindParameter(stmt2, 2, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_CHAR, 0, 0, &bind2, 20, NULL);

/* adjust these parameters for memory leak testing */
/* TODO a way for this test to detect memory leak here. */
const int n_iterations = 20000; /* E.g. 200000 */
const int freq_parameterized = 2; /* set 1 to parameterize all, INT_MAX for none */

size_t memory_usage_watermark = 0;

for (i= 1; i <= n_iterations; ++i)
{
SET_STMT(stmt2);

// Test option - force reallocation of socket
// odbc_reset_statement();

if (i % freq_parameterized == 0)
{
bind1 = i;
odbc_command("!insert into #mars2 values(?, ?)");
}
else
odbc_command("!insert into #mars2 values(1, 'foo')");

size_t newmu = memory_usage();
if (newmu > memory_usage_watermark)
{
printf("Memory usage increased to %lu on iteration %d\n", (unsigned long)newmu, i);
memory_usage_watermark = newmu;
}
// Perform several fetches for each insert, so we also test continuing to draw
// further packets of the fetch
SET_STMT(stmt1);
for (j = 0; j < 10; ++j)
{
CHKFetch("S");
}
}
printf("Performed %d inserts while fetching.\n", i - 1);

/* reset statements */
SET_STMT(stmt1);
odbc_reset_statement();
SET_STMT(stmt2);
odbc_reset_statement();
Expand All @@ -114,8 +208,6 @@ TEST_MAIN()

EndTransaction(SQL_COMMIT);

/* TODO test receiving large data should not take much memory */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot see the implementation of this in the test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replaced that comment with /* TODO a way for this test to detect memory leak here. */ a bit further up -- unless you meant something else? Receiving one row of large data would take as much memory as it takes to process that data, so I took your comment to mean that repeated iterations of the test should not keep increasing memory.


odbc_disconnect();
return 0;
}
Expand Down
16 changes: 12 additions & 4 deletions src/tds/packet.c
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ int
tds_read_packet(TDSSOCKET * tds)
{
#if ENABLE_ODBC_MARS
TDS_UINT this_seq;
TDSCONNECTION *conn = tds->conn;

tds_mutex_lock(&conn->list_mtx);
Expand Down Expand Up @@ -554,13 +555,20 @@ tds_read_packet(TDSSOCKET * tds)
tds->in_pos = 8;
tds->in_flag = tds->in_buf[0];

/* send acknowledge if needed */
if ((int32_t) (tds->recv_seq + 2 - tds->recv_wnd) >= 0)
tds_update_recv_wnd(tds, tds->recv_seq + 4);

/* Look ahead by up to 4 packets */
this_seq = TDS_GET_A4LE(&((const TDS72_SMP_HEADER*)packet->buf)->seq);
if ((int32_t)(this_seq + 2 - tds->recv_wnd) >= 0)
tds_update_recv_wnd(tds, this_seq + 4);
return tds->in_len;
}

#if ENABLE_EXTRA_CHECKS
{
TDS_UINT np = 0;
for (p_packet = &conn->packets; *p_packet; p_packet = &(*p_packet)->next) ++np;
tdsdump_log(TDS_DBG_NETWORK, "MARS SID %d queued packets %u\n", tds->sid, np);
}
#endif
/* network ok ? process network */
if (!conn->in_net_tds) {
tds_connection_network(conn, tds, 0);
Expand Down
Loading