diff --git a/.gitignore b/.gitignore index d4704003..49f05b1e 100644 --- a/.gitignore +++ b/.gitignore @@ -62,3 +62,7 @@ xxx_* # The Access unit tests copy empty.accdb and empty.mdb to these names and use them. test.accdb test.mdb + +# Don't commit bcp test files. +test*.bcp +bcp.errors diff --git a/src/bcp.cpp b/src/bcp.cpp new file mode 100644 index 00000000..29764f65 --- /dev/null +++ b/src/bcp.cpp @@ -0,0 +1,289 @@ +// Implementation for the Connection.bcp() method. + +#include "pyodbc.h" +#include "wrapper.h" +#include "pyodbcmodule.h" +#include "textenc.h" +#include "connection.h" +#include "errors.h" +#include "bcp.h" + +#ifdef _MSC_VER + #include + #define WINAPI_OR_CDECL WINAPI + typedef FARPROC _BCP_FUNC; +#else + #include + #define WINAPI_OR_CDECL /* nothing */ + typedef void* _BCP_FUNC; +#endif + +#define BCP_DEBUG 0 + +// ODBC BCP constants. +#define FAIL 0 +#define SUCCEED 1 +#define DB_IN 1 +#define DB_OUT 2 +#define BCPMAXERRS 1 // Sets max errors allowed +#define BCPFIRST 2 // Sets first row to be copied out +#define BCPLAST 3 // Sets number of rows to be copied out + +// Signatures for BCP calls. +typedef int (WINAPI_OR_CDECL *_BCP_INIT)(HDBC, SQLWCHAR*, SQLWCHAR*, SQLWCHAR*, int); +typedef int (WINAPI_OR_CDECL *_BCP_READFMT)(HDBC, SQLWCHAR*); +typedef int (WINAPI_OR_CDECL *_BCP_EXEC)(HDBC, long*); +typedef int (WINAPI_OR_CDECL *_BCP_CONTROL)(HDBC, int, void*); + +// BCP functions +static _BCP_INIT bcp_init = 0; +static _BCP_READFMT bcp_readfmt = 0; +static _BCP_EXEC bcp_exec = 0; +static _BCP_CONTROL bcp_control = 0; + + +#ifdef __linux__ +// Load the driver's library. +static void* _bcplib; +static void* _load_bcplib(HDBC hdbc) +{ + // Ask the DM for the file name of the driver's library. + char name[1024]; + SQLSMALLINT cch; + SQLRETURN rc = SQLGetInfo(hdbc, SQL_DRIVER_NAME, name, sizeof name, &cch); + if (rc != SQL_SUCCESS) + return NULL; + + // If we're lucky, that's all we need. + void *handle = dlopen(name, RTLD_NOLOAD | RTLD_LAZY); + if (handle) + return handle; + + // As we really expected, we need the full path for the library. + FILE *fp = fopen("/proc/self/maps", "r"); + if (!fp) + return NULL; + char line[1024]; + while (fgets(line, sizeof line, fp)) { + char *path = strchr(line, '/'); + if (path) { + path[strcspn(path, "\r\n")] = '\0'; + char *base = strrchr(path, '/'); + base = base ? base + 1 : path; + if (strcmp(base, name) == 0) { + handle = dlopen(path, RTLD_NOLOAD | RTLD_LAZY); + break; + } + } + } + fclose(fp); + return handle; +} +#endif + +// Find one of the bcp API functions; different strategies for each platform. +static _BCP_FUNC _find_bcp_function(char* name) +{ +#ifdef _WIN32 + static size_t count; + static HMODULE mods[256]; + static HMODULE odbclib; + if (!count) { + DWORD needed = 0; + if (!EnumProcessModules(GetCurrentProcess(), mods, sizeof(mods), &needed)) + return 0; + count = needed / sizeof(HMODULE); + } + if (odbclib) + return GetProcAddress(odbclib, name); + for (size_t i = 0; i < count; ++i) { + _BCP_FUNC func = GetProcAddress(mods[i], name); + if (func) { + odbclib = mods[i]; + return func; + } + } + return 0; +#else +#ifdef __linux__ + return _bcplib ? dlsym(_bcplib, name) : NULL; +#else + return dlsym(RTLD_DEFAULT, name); +#endif +#endif +} + +// Dynamically locate the bcp API functions we need (returns false if we can't find them). +static bool _load_bcp_functions(HDBC hdbc) +{ + if (!hdbc) // only really needed for Linux, but this keeps the compiler happy :) + return false; +#ifdef __linux__ + _bcplib = _load_bcplib(hdbc); + if (!_bcplib) + return false; +#endif + bcp_init = (_BCP_INIT) _find_bcp_function("bcp_initW"); + bcp_readfmt = (_BCP_READFMT)_find_bcp_function("bcp_readfmtW"); + bcp_exec = (_BCP_EXEC) _find_bcp_function("bcp_exec"); + bcp_control = (_BCP_CONTROL)_find_bcp_function("bcp_control"); +#ifdef __linux__ + dlclose(_bcplib); // just releases the handle, doesn't unload + _bcplib = NULL; +#endif + return bcp_init && bcp_readfmt && bcp_exec && bcp_control; +} + +// Apply a control option if the user provided a value, returning false on failure. +bool _apply_int_option(Connection* conn, PyObject* value, int option, const char* name) +{ + if (value == Py_None) + return true; + if (!PyLong_Check(value)) { + PyErr_Format(ProgrammingError, "%s must be an integer", name); + return false; + } + long intval = PyLong_AsLong(value); + if (bcp_control(conn->hdbc, option, (void*)intval) != SUCCEED) { + PyErr_Format(OperationalError, "failure setting %s", name); + return false; + } + return true; +} + +// Prepare and execute a bcp operation. +PyObject* _bcp_impl(PyObject* py_conn, const BCP_OPTS& opts) +{ + int bcp_rc = SUCCEED; + long row_count = 0; + +#if BCP_DEBUG + // Show the arguments. + printf("action : %ld\n", opts.action); + printf("table : "); PyObject_Print(opts.table, stdout, 0); printf("\n"); + printf("datafile : "); PyObject_Print(opts.datafile, stdout, 0); printf("\n"); + printf("formatfile : "); PyObject_Print(opts.formatfile, stdout, 0); printf("\n"); + printf("errorfile : "); PyObject_Print(opts.errorfile, stdout, 0); printf("\n"); + printf("firstrow : "); PyObject_Print(opts.firstrow, stdout, 0); printf("\n"); + printf("lastrow : "); PyObject_Print(opts.lastrow, stdout, 0); printf("\n"); + printf("maxerrors : "); PyObject_Print(opts.maxerrors, stdout, 0); printf("\n"); +#endif + + // Make sure we have a valid connection. + if (!py_conn || !Connection_Check(py_conn)) { + PyErr_SetString(ProgrammingError, "first argument must be a valid connection"); + return 0; + } + Connection* conn = (Connection*)py_conn; + if (conn->hdbc == SQL_NULL_HANDLE) { + PyErr_SetString(ProgrammingError, "attempt to use a closed connection."); + return 0; + } + + // Verify that bcp is enabled for this connection. + if (!conn->bcp_enabled) { + PyErr_SetString(ProgrammingError, "bcp not supported by this driver."); + return 0; + } + + // Load the BCP function pointers. + if (!_load_bcp_functions(conn->hdbc)) { + PyErr_SetString(OperationalError, "bcp functions not provided by driver"); + return 0; + } + + // Get the required arguments. Note that we use less generic names in public-facing API. + if (opts.action != DB_IN && opts.action != DB_OUT) { + PyErr_SetString(ProgrammingError, "action must be pyodbc.BCP_IN or pyodbc.BCP_OUT"); + return 0; + } + SQLWChar table(opts.table, ENCSTR_UTF16NE); + if (opts.datafile == Py_None) { + PyErr_SetString(ProgrammingError, "datafile is a required argument"); + return 0; + } + PyObject* datafile_path = PyOS_FSPath(opts.datafile); + if (!datafile_path) + return 0; // exception already raised by PyOS_FSPath() + if (!PyUnicode_Check(datafile_path)) { + Py_DECREF(datafile_path); + PyErr_SetString(PyExc_TypeError, "datafile must be a str or pathlib.Path"); + return 0; + } + SQLWChar datafile(datafile_path, ENCSTR_UTF16NE); + + // The error filename is optional. + SQLWChar errorfile; + PyObject* errorfile_path = 0; + if (opts.errorfile != Py_None) { + errorfile_path = PyOS_FSPath(opts.errorfile); + if (!errorfile_path) { + Py_DECREF(datafile_path); + return 0; // exception set by PyOS_FSPath() + } + if (!PyUnicode_Check(errorfile_path)) { + Py_DECREF(datafile_path); + Py_DECREF(errorfile_path); + PyErr_SetString(PyExc_TypeError, "errorfile must be a str or pathlib.Path"); + return 0; + } + errorfile.set(errorfile_path, ENCSTR_UTF16NE); + } + + // Initialize the bcp job. + Py_BEGIN_ALLOW_THREADS + bcp_rc = bcp_init(conn->hdbc, (SQLWCHAR*)table, (SQLWCHAR*)datafile, (SQLWCHAR*)errorfile, opts.action); + Py_END_ALLOW_THREADS + Py_DECREF(datafile_path); + Py_XDECREF(errorfile_path); + if (conn->hdbc == SQL_NULL_HANDLE) { + // The connection was closed by another thread. + PyErr_SetString(ProgrammingError, "connection was closed."); + return 0; + } + if (bcp_rc != SUCCEED) { + PyErr_SetString(OperationalError, "bcp_init failure"); + return 0; + } + + // Read the transfer format file if requested. + if (opts.formatfile != Py_None) { + PyObject* formatfile_path = PyOS_FSPath(opts.formatfile); + if (!formatfile_path) + return 0; // exception is already set + if (!PyUnicode_Check(formatfile_path)) { + PyErr_SetString(ProgrammingError, "formatfile must be a str or pathlib.Path"); + return 0; + } + SQLWChar formatfile(formatfile_path, ENCSTR_UTF16NE); + Py_BEGIN_ALLOW_THREADS + bcp_rc = bcp_readfmt(conn->hdbc, (SQLWCHAR*)formatfile); + Py_END_ALLOW_THREADS + Py_DECREF(formatfile_path); + if (conn->hdbc == SQL_NULL_HANDLE) { + PyErr_SetString(ProgrammingError, "connection was closed."); + return 0; + } + if (bcp_rc != SUCCEED) { + PyErr_SetString(OperationalError, "bcp_readfmt failure"); + return 0; + } + } + + // Apply the rest of the options specified (no need to release the GIL for these). + if (!_apply_int_option(conn, opts.firstrow, BCPFIRST, "firstrow" )) return 0; + if (!_apply_int_option(conn, opts.lastrow, BCPLAST, "lastrow" )) return 0; + if (!_apply_int_option(conn, opts.maxerrors, BCPMAXERRS, "maxerrors")) return 0; + + // Perform the transfer. + Py_BEGIN_ALLOW_THREADS + bcp_rc = bcp_exec(conn->hdbc, &row_count); + Py_END_ALLOW_THREADS + if (bcp_rc != SUCCEED) { + PyErr_SetString(OperationalError, "bcp_exec failure"); + return 0; + } + + // Return the number of rows transferred. + return PyLong_FromLong(row_count); +} diff --git a/src/bcp.h b/src/bcp.h new file mode 100644 index 00000000..ebbb6257 --- /dev/null +++ b/src/bcp.h @@ -0,0 +1,29 @@ +/** + * Types, constants, and signatures needed for BCP. + */ +#ifndef _BCP_H_ +#define _BCP_H_ + +#include + +// BCP constants. +#define SQL_BCP_ON 1L +#define SQL_COPT_SS_BCP 1219 // Allow BCP usage on connection + +// Arguments for a call to the bcp() method. +struct BCP_OPTS { + // Required positional-only arguments. + long action; + PyObject* table; + PyObject* datafile; + + // Optional keyword-only arguments. + PyObject* formatfile; + PyObject* errorfile; + PyObject* firstrow; + PyObject* lastrow; + PyObject* maxerrors; +}; +PyObject* _bcp_impl(PyObject* conn, const BCP_OPTS& opts); + +#endif // _BCP_H_ diff --git a/src/connection.cpp b/src/connection.cpp index 1a50d2b5..65f22357 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -13,6 +13,7 @@ #include "textenc.h" #include "connection.h" #include "cursor.h" +#include "bcp.h" #include "pyodbcmodule.h" #include "errors.h" #include "cnxninfo.h" @@ -213,6 +214,9 @@ PyObject* Connection_New(PyObject* pConnectString, bool fAutoCommit, long timeou } } + // Find out if bcp is supported. We have to do this *before* calling SQLDriverConnect(). + bool bcp_enabled = SQL_SUCCEEDED(SQLSetConnectAttr(hdbc, SQL_COPT_SS_BCP, (void *)SQL_BCP_ON, SQL_IS_INTEGER)); + if (!Connect(pConnectString, hdbc, timeout, encoding)) { // Connect has already set an exception. @@ -246,6 +250,7 @@ PyObject* Connection_New(PyObject* pConnectString, bool fAutoCommit, long timeou cnxn->hdbc = hdbc; cnxn->nAutoCommit = fAutoCommit ? SQL_AUTOCOMMIT_ON : SQL_AUTOCOMMIT_OFF; + cnxn->bcp_enabled = bcp_enabled; cnxn->searchescape = 0; cnxn->maxwrite = 0; cnxn->timeout = 0; @@ -1325,6 +1330,41 @@ static PyObject* Connection_setdecoding(PyObject* self, PyObject* args, PyObject Py_RETURN_NONE; } +static char bcp_doc[] = + "bcp(self, action, table, datafile, /, **kwargs)\n\n" + "Required positional arguments\n" + " action: BCP_IN or BCP_OUT\n" + " table: required name of table to copy or populate\n" + " datafile: path to table data file\n\n" + "Optional keyword arguments\n" + " formatfile: custom specification for copying\n" + " errorfile: used to report rows which failed import\n" + " firstrow: used to skip past the first rows (1-based)\n" + " lastrow: tells bcp to stop after processing the row at this position\n" + " maxerrors: tells bcp to abort after detecting this many errors (default 10)"; +static PyObject* Connection_bcp(PyObject* self, PyObject* args, PyObject *kwargs) +{ + // Arguments. + BCP_OPTS opts; + opts.formatfile = Py_None; + opts.errorfile = Py_None; + opts.firstrow = Py_None; + opts.lastrow = Py_None; + opts.maxerrors = Py_None; + + // First three arguments are positional-only. + static char* kwlist[] = { "", "", "", "formatfile", "errorfile", "firstrow", "lastrow", "maxerrors", NULL }; + + if (!PyArg_ParseTupleAndKeywords( + args, kwargs, + "lUO|OOOOO:bcp", + kwlist, + &opts.action, &opts.table, &opts.datafile, + &opts.formatfile, &opts.errorfile, &opts.firstrow, &opts.lastrow, &opts.maxerrors)) + return NULL; + + return _bcp_impl(self, opts); +} static char enter_doc[] = "__enter__() -> self."; @@ -1377,6 +1417,7 @@ static struct PyMethodDef Connection_methods[] = { "setdecoding", (PyCFunction)Connection_setdecoding, METH_VARARGS|METH_KEYWORDS, setdecoding_doc }, { "setencoding", (PyCFunction)Connection_setencoding, METH_VARARGS|METH_KEYWORDS, 0 }, { "set_attr", Connection_set_attr, METH_VARARGS, set_attr_doc }, + { "bcp", (PyCFunction)Connection_bcp, METH_VARARGS|METH_KEYWORDS, bcp_doc }, { "__enter__", Connection_enter, METH_NOARGS, enter_doc }, { "__exit__", Connection_exit, METH_VARARGS, exit_doc }, diff --git a/src/connection.h b/src/connection.h index c319c833..9067ccfc 100644 --- a/src/connection.h +++ b/src/connection.h @@ -39,6 +39,9 @@ struct Connection // to insert NULLs into binary columns. bool supports_describeparam; + // True if the driver supports BCP. + bool bcp_enabled; + // The column size of datetime columns, obtained from SQLGetInfo(), used to determine the datetime precision. int datetime_precision; diff --git a/src/pyodbc.pyi b/src/pyodbc.pyi index 685cebf1..1727ffa0 100644 --- a/src/pyodbc.pyi +++ b/src/pyodbc.pyi @@ -2,6 +2,7 @@ # ruff: noqa: E303, N802, N803 from __future__ import annotations from collections.abc import Generator, Iterable, Iterator, Sequence +from pathlib import Path from typing import Any, Callable, Final, Union @@ -294,7 +295,6 @@ SQL_UNION: int SQL_USER_NAME: int SQL_XOPEN_CLI_YEAR: int - # pyodbc-specific constants BinaryNull: Any # to distinguish binary NULL values from char NULL values SQLWCHAR_SIZE: int @@ -309,6 +309,10 @@ paramstyle: Final[str] = 'qmark' threadsafety: Final[int] = 1 version: Final[str] # not pep-0249 +# Bulk copy actions +BCP_IN: Final[int] = 1 +BCP_OUT: Final[int] = 2 + # read-write (not pep-0249) lowercase: bool = False native_uuid: bool = False @@ -511,6 +515,36 @@ class Connection: """Close the connection. Any uncommitted SQL statements will be rolled back.""" ... + # method for bulk import/export of data + + def bcp(self, + action: int, + table: str, + datafile: str | Path, + /, *, + formatfile: str | Path | None = None, + errorfile: str | Path | None = None, + firstrow: int | None = None, + lastrow: int | None = None, + maxerrors: int | None = None) -> int: + """Perform a bulk copy + + Arguments: + action: pyodbc.BCP_IN or pyodbc.BCP_OUT + table: required name of table to copy or populate + datafile: path to table data file + formatfile: custom specification for copying + errorfile: used to report rows which failed import + firstrow: used to skip past the first rows (1-based) + lastrow: tells bcp to stop after processing the row at this position + maxerrors: tells bcp to abort after detecting this many errors (default 10) + + Returns: + the number of rows copied + """ + ... + + class Cursor: """The class representing database cursors. Cursors are vehicles for executing SQL diff --git a/src/pyodbcmodule.cpp b/src/pyodbcmodule.cpp index ede13fe3..f8ea9a0f 100644 --- a/src/pyodbcmodule.cpp +++ b/src/pyodbcmodule.cpp @@ -311,7 +311,7 @@ static bool AllocateEnv() } } Py_DECREF(odbcversion); - + if (!SQL_SUCCEEDED(SQLSetEnvAttr(henv, SQL_ATTR_ODBC_VERSION, defaultVersion, sizeof(int)))) { PyErr_SetString(PyExc_RuntimeError, "Unable to set SQL_ATTR_ODBC_VERSION attribute."); @@ -1227,6 +1227,8 @@ PyMODINIT_FUNC PyInit_pyodbc() PyModule_AddObject(module, "BinaryNull", null_binary); PyModule_AddIntConstant(module, "SQLWCHAR_SIZE", sizeof(SQLWCHAR)); + PyModule_AddIntConstant(module, "BCP_IN", 1); + PyModule_AddIntConstant(module, "BCP_OUT", 2); if (!PyErr_Occurred()) { diff --git a/tests/bcp_character.fmt b/tests/bcp_character.fmt new file mode 100644 index 00000000..36128c64 --- /dev/null +++ b/tests/bcp_character.fmt @@ -0,0 +1,6 @@ +14.0 +4 +1 SQLCHAR 0 12 "\t" 1 i "" +2 SQLCHAR 0 40 "\t" 2 t SQL_Latin1_General_CP1_CI_AS +3 SQLCHAR 0 41 "\t" 3 n "" +4 SQLCHAR 0 11 "\n" 4 d "" diff --git a/tests/bcp_native.fmt b/tests/bcp_native.fmt new file mode 100644 index 00000000..a03651eb --- /dev/null +++ b/tests/bcp_native.fmt @@ -0,0 +1,6 @@ +14.0 +4 +1 SQLINT 1 4 "" 1 i "" +2 SQLNCHAR 2 40 "" 2 t SQL_Latin1_General_CP1_CI_AS +3 SQLNUMERIC 1 19 "" 3 n "" +4 SQLDATE 1 3 "" 4 d "" diff --git a/tests/sqlserver_test.py b/tests/sqlserver_test.py index 6feee9ec..de476527 100755 --- a/tests/sqlserver_test.py +++ b/tests/sqlserver_test.py @@ -8,6 +8,7 @@ from decimal import Decimal from datetime import date, time, datetime from functools import lru_cache +from pathlib import Path import pyodbc import pytest @@ -32,6 +33,7 @@ def connect(autocommit=False, attrs_before=None): DRIVER = connect().getinfo(pyodbc.SQL_DRIVER_NAME) +DRIVER_VERSION = tuple(int(n) for n in connect().getinfo(pyodbc.SQL_DRIVER_VER).split(".")) IS_FREEDTS = bool(re.search(r'tsodbc', DRIVER, flags=re.IGNORECASE)) IS_MSODBCSQL = bool(re.search(r'(msodbcsql|sqlncli|sqlsrv32\.dll)', DRIVER, re.IGNORECASE)) @@ -1656,6 +1658,101 @@ def test_sql_variant(cursor: pyodbc.Cursor): assert results[index] == expected_value +@pytest.mark.skipif(IS_FREEDTS, reason="the FreeTDS driver does not provide the bcp_* functions") +@pytest.mark.skipif(DRIVER_VERSION < (13, 2, 0), reason="earier drivers don't pass this test") +def test_bcp(cursor: pyodbc.Cursor): + """Exercise Connection.bcp()""" + + # Create two test tables and populate one of them. + sql = "create table {} (i int, t nvarchar(20), n numeric(5,2), d date)" + for table in ("t1", "t2"): + cursor.execute(sql.format(table)) + test_data = ( + (1, "turkey", Decimal('123.45'), date(2000, 1, 1)), + (2, "tofu", Decimal('234.56'), None), + (3, None, Decimal('345.67'), date(2000, 1, 3)), + (4, "Käse", None, date(2000, 1, 4)), + ) + cursor.executemany("insert into t1 values (?, ?, ?, ?)", test_data) + + # Make sure this fails for a non-existent table. + conn = cursor.connection + test_directory = Path(__file__).parent + datafile = test_directory / "test_native.bcp" + with pytest.raises(pyodbc.OperationalError): + conn.bcp(pyodbc.BCP_OUT, "__no_way_this_table_exists__", datafile) + + # Round-trip the table data without specifying a format. + count_out = conn.bcp(pyodbc.BCP_OUT, "t1", datafile) + count_in = conn.bcp(pyodbc.BCP_IN, "t2", datafile) + assert count_out == len(test_data) + assert count_in == len(test_data) + cursor.execute("select * from t2 order by 1") + round_tripped = tuple(tuple(row) for row in cursor.fetchall()) + assert round_tripped == test_data + + # Round-trip table data using an explicitly-provided native format. + cursor.execute("truncate table t2") + fmtfile = test_directory / "bcp_native.fmt" + count_in = conn.bcp(pyodbc.BCP_IN, "t2", datafile, formatfile=fmtfile) + assert count_in == len(test_data) + cursor.execute("select * from t2 order by 1") + round_tripped = tuple(tuple(row) for row in cursor.fetchall()) + assert round_tripped == test_data + + # Round-trip table data using an explicitly-provided character format. + # Also test passing str objects directly instead of pathlib.Path arguments. + cursor.execute("truncate table t2") + fmtfile = test_directory / "bcp_character.fmt" + datafile = test_directory / "test_character.bcp" + count_out = conn.bcp(pyodbc.BCP_OUT, "t1", str(datafile), formatfile=str(fmtfile)) + count_in = conn.bcp(pyodbc.BCP_IN, "t2", str(datafile), formatfile=str(fmtfile)) + assert count_out == len(test_data) + assert count_in == len(test_data) + cursor.execute("select * from t2 order by 1") + round_tripped = tuple(tuple(row) for row in cursor.fetchall()) + assert round_tripped == test_data + + # Test restoring a subset of the data. + cursor.execute("truncate table t2") + count_in = conn.bcp(pyodbc.BCP_IN, "t2", datafile, formatfile=fmtfile, firstrow=2, lastrow=3) + cursor.execute("select * from t2 order by 1") + round_tripped = tuple(tuple(row) for row in cursor.fetchall()) + assert round_tripped == test_data[1:3] + + # See what happens when we deliberately introduce errors. + cursor.execute("truncate table t2") + character_data = datafile.read_text() + baddatafile = test_directory / "test_baddata.bcp" + flawed_data = character_data.replace("123.45", "chicken") + with open(baddatafile, "w", newline="\n") as f: + f.write(flawed_data) + errorfile = test_directory / "bcp.errors" + count_in = conn.bcp(pyodbc.BCP_IN, "t2", baddatafile, formatfile=fmtfile, errorfile=errorfile) + assert count_in == len(test_data) - 1 + cursor.execute("select * from t2 order by 1") + round_tripped = tuple(tuple(row) for row in cursor.fetchall()) + assert round_tripped == test_data[1:] + expected = ( + "#@ Row 1, Column 3: Invalid character value for cast specification @#", + "1\tturkey\tchicken\t2000-01-01", + ) + error_lines = errorfile.read_text().splitlines() + for i, line in enumerate(error_lines): + assert line == expected[i] + + # Test override of the default error threshold: job should abort, leaving the table empty. + cursor.execute("truncate table t2") + flawed_data = flawed_data.replace("2000-01-04", "last Tuesday") + with open(baddatafile, "w", newline="\n") as f: + f.write(flawed_data) + with pytest.raises(pyodbc.OperationalError): + conn.bcp(pyodbc.BCP_IN, "t2", baddatafile, formatfile=fmtfile, maxerrors=1) + cursor.execute("select * from t2 order by 1") + round_tripped = tuple(tuple(row) for row in cursor.fetchall()) + assert not round_tripped + + def get_sqlserver_version(cursor: pyodbc.Cursor): """