Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,27 @@


import base64
from io import BytesIO
import logging
from io import BytesIO, StringIO

from markupsafe import Markup
from odoo import _, models
from odoo.exceptions import UserError
from openpyxl import Workbook, load_workbook

_logger = logging.getLogger(__name__)
try:
from csv import reader

import xlrd
except (OSError, ImportError) as err: # pragma: no cover
_logger.error(err)

try:
import chardet
except ImportError:
_logger.warning("chardet library not found, please install it from http://pypi.python.org/pypi/chardet")


class AccountStatementImport(models.TransientModel):
_name = "account.statement.import"
Expand Down Expand Up @@ -151,39 +165,84 @@ def import_file_button(self, wizard_data=None):
return result

def split_base64_excel(self, header_rows_count, rows_per_file_limit):
"""Split Excel file into multiple parts to avoid overloading the system.
Returns empty list if file is not a valid Excel or if split is not needed.
Only processes rows where the date column is not empty."""
"""Split Excel/CSV file into multiple parts."""
if not self.statement_file:
return []

output_base64_list = []
mapping = self.sheet_mapping_id
journal = self.env["account.journal"].browse(self.env.context.get("journal_id"))
currency_code = (journal.currency_id or journal.company_id.currency_id).name

try:
file_bytes = base64.b64decode(self.statement_file)
read_buffer = BytesIO(file_bytes)

# Try openpyxl (xlsx)
input_workbook = load_workbook(read_buffer)
input_worksheet = input_workbook.active
all_rows = list(input_worksheet.rows)
csv_or_xlsx = (input_workbook, input_worksheet)

except Exception:
return [self.statement_file]
try:
# Try xlrd (xls)
workbook = xlrd.open_workbook(
file_contents=file_bytes,
encoding_override=(mapping.file_encoding if mapping.file_encoding else None),
)
sheet = workbook.sheet_by_index(0)
csv_or_xlsx = (workbook, sheet)

except Exception:
# Try CSV
csv_options = {}
csv_delimiter = mapping._get_column_delimiter_character()
if csv_delimiter:
csv_options["delimiter"] = csv_delimiter
if mapping.quotechar:
csv_options["quotechar"] = mapping.quotechar

try:
decoded = file_bytes.decode(mapping.file_encoding or "utf-8")
except UnicodeDecodeError:
detected_encoding = chardet.detect(file_bytes).get("encoding", False)
if not detected_encoding:
raise UserError(self.env._("No valid encoding was found for the attached file")) from None
decoded = file_bytes.decode(detected_encoding)

csv_reader = reader(StringIO(decoded), **csv_options)
csv_or_xlsx = csv_reader
all_rows = [row for row in list(csv_or_xlsx) if any(cell for cell in row)]
parser = self.env["account.statement.import.sheet.parser"]

# Only parse header and rows for Excel files (when all_rows is not yet populated)
if not all_rows:
header = parser.parse_header(csv_or_xlsx, mapping)
columns = dict()
for column_name in parser._get_column_names():
columns[column_name] = parser._get_column_indexes(header, column_name, mapping)
data = csv_or_xlsx, self.statement_file
all_rows = parser._parse_rows(mapping, currency_code, data, columns)
else:
# For CSV files, we already have all_rows, convert list to iterator for parse_header
header = parser.parse_header(iter(all_rows), mapping)

all_rows = list(input_worksheet.rows)
if not all_rows:
return []

header_rows = all_rows[:header_rows_count]
data_rows = all_rows[header_rows_count:]

# Get the date column index from the sheet mapping using the parser's method
parser = self.env["account.statement.import.sheet.parser"]
header = parser.parse_header((input_workbook, input_worksheet), self.sheet_mapping_id)
try:
date_column_indexes = parser._get_column_indexes(header, "timestamp_column", self.sheet_mapping_id)
date_column_indexes = parser._get_column_indexes(header, "timestamp_column", mapping)
date_column_index = date_column_indexes[0] if date_column_indexes else None
except Exception as e:
raise UserError(_("Error importing bank statement: %s") % str(e))

# Filter out rows where the date column is empty
data_rows = self._filter_rows_with_date(data_rows, date_column_index)
# Filter rows with empty date
if date_column_index is not None:
data_rows = [r for r in data_rows if len(r) > date_column_index and r[date_column_index]]

start_row_index = 0
total_data_rows = len(data_rows)
Expand All @@ -196,20 +255,20 @@ def split_base64_excel(self, header_rows_count, rows_per_file_limit):
output_worksheet = output_workbook.active

for header_row in header_rows:
row_values = [cell.value for cell in header_row]
output_worksheet.append(row_values)
output_worksheet.append(header_row)

for data_row in rows_for_current_part:
row_values = [cell.value for cell in data_row]
output_worksheet.append(row_values)
output_worksheet.append(data_row)

write_buffer = BytesIO()
output_workbook.save(write_buffer)
output_bytes = write_buffer.getvalue()

base64_content = base64.b64encode(output_bytes).decode("utf-8")
output_base64_list.append(base64_content)

start_row_index = end_row_index

return output_base64_list

def _filter_rows_with_date(self, data_rows, date_column_index):
Expand Down
Loading