Skip to content

Commit f35935c

Browse files
committed
[IMP] start extraction of chunk processing \o/
1 parent 6f9aad4 commit f35935c

22 files changed

+361
-150
lines changed

chunk_processing/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
from . import models
2+
from . import components

chunk_processing/__manifest__.py

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Copyright 2021 Akretion (https://www.akretion.com).
2+
# @author Sébastien BEAU <[email protected]>
3+
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
4+
5+
6+
{
7+
"name": "Chunk Processing",
8+
"summary": "Base module for processing chunk",
9+
"version": "14.0.1.0.0",
10+
"category": "Uncategorized",
11+
"website": "https://github.com/shopinvader/pattern-import-export",
12+
"author": " Akretion",
13+
"license": "AGPL-3",
14+
"application": False,
15+
"installable": True,
16+
"external_dependencies": {
17+
"python": [],
18+
"bin": [],
19+
},
20+
"depends": [
21+
"queue_job",
22+
"component",
23+
],
24+
"data": [
25+
"views/chunk_item_view.xml",
26+
],
27+
"demo": [],
28+
}
+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from . import processor
2+
from . import splitter
3+
from . import splitter_json
+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# Copyright 2021 Akretion (https://www.akretion.com).
2+
# @author Sébastien BEAU <[email protected]>
3+
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
4+
5+
6+
from odoo.addons.component.core import AbstractComponent
7+
8+
9+
class ChunkProcessor(AbstractComponent):
10+
_name = "chunk.processor"
11+
_collection = "chunk.item"
12+
13+
def run(self):
14+
raise NotImplementedError
+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# Copyright 2021 Akretion (https://www.akretion.com).
2+
# @author Sébastien BEAU <[email protected]>
3+
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
4+
5+
from odoo.addons.component.core import AbstractComponent
6+
7+
8+
class ChunkSplitter(AbstractComponent):
9+
_name = "chunk.splitter"
10+
_collection = "chunk.group"
11+
12+
def _parse_data(self, data):
13+
raise NotImplementedError
14+
15+
def _prepare_chunk(self, start_idx, stop_idx, data):
16+
return {
17+
"start_idx": start_idx,
18+
"stop_idx": stop_idx,
19+
"data": data,
20+
"nbr_item": len(data),
21+
"state": "pending",
22+
"group_id": self.collection.id,
23+
}
24+
25+
def _should_create_chunk(self, items, next_item):
26+
"""Customise this code if you want to add some additionnal
27+
item after reaching the limit"""
28+
return len(items) > self.collection.chunk_size
29+
30+
def _create_chunk(self, start_idx, stop_idx, data):
31+
vals = self._prepare_chunk(start_idx, stop_idx, data)
32+
chunk = self.env["chunk.item"].create(vals)
33+
# we enqueue the chunk in case of multi process of if it's the first chunk
34+
if self.collection.process_multi or len(self.collection.item_ids) == 1:
35+
chunk.with_delay(priority=self.collection.job_priority).run()
36+
return chunk
37+
38+
def run(self, data):
39+
items = []
40+
start_idx = 1
41+
previous_idx = None
42+
for idx, item in self._parse_data(data):
43+
if self._should_create_chunk(items, item):
44+
self._create_chunk(start_idx, previous_idx, items)
45+
items = []
46+
start_idx = idx
47+
items.append((idx, item))
48+
previous_idx = idx
49+
if items:
50+
self._create_chunk(start_idx, idx, items)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Copyright 2021 Akretion (https://www.akretion.com).
2+
# @author Sébastien BEAU <[email protected]>
3+
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
4+
5+
import json
6+
7+
from odoo.addons.component.core import Component
8+
9+
10+
class ChunkSplitterJson(Component):
11+
_inherit = "chunk.splitter"
12+
_name = "chunk.splitter.json"
13+
_usage = "json"
14+
15+
def _parse_data(self, data):
16+
items = json.loads(data.decode("utf-8"))
17+
for idx, item in enumerate(items):
18+
yield idx + 1, item

chunk_processing/models/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
from . import chunk_item
2+
from . import chunk_group
+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# Copyright 2021 Akretion (https://www.akretion.com).
2+
# @author Sébastien BEAU <[email protected]>
3+
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
4+
5+
6+
from odoo import _, api, fields, models
7+
8+
9+
class ChunkGroup(models.Model):
10+
_inherit = "collection.base"
11+
_name = "chunk.group"
12+
13+
item_ids = fields.One2many("chunk.item", "group_id", "Item")
14+
process_multi = fields.Boolean()
15+
job_priority = fields.Integer(default=20)
16+
chunk_size = fields.Integer(default=500, help="Define the size of the chunk")
17+
progress = fields.Float(compute="_compute_stat")
18+
date_done = fields.Datetime()
19+
data_format = fields.Selection(
20+
[
21+
("json", "Json"),
22+
("xml", "XML"),
23+
]
24+
)
25+
state = fields.Selection(
26+
[("pending", "Pending"), ("failed", "Failed"), ("done", "Done")],
27+
default="pending",
28+
)
29+
info = fields.Char()
30+
nbr_error = fields.Integer(compute="_compute_stat")
31+
nbr_success = fields.Integer(compute="_compute_stat")
32+
apply_on_model = fields.Char()
33+
usage = fields.Char()
34+
35+
@api.depends("item_ids.nbr_error", "item_ids.nbr_success")
36+
def _compute_stat(self):
37+
for record in self:
38+
record.nbr_error = sum(record.mapped("item_ids.nbr_error"))
39+
record.nbr_success = sum(record.mapped("item_ids.nbr_success"))
40+
todo = sum(record.mapped("item_ids.nbr_item"))
41+
if todo:
42+
record.progress = (record.nbr_error + record.nbr_success) * 100.0 / todo
43+
else:
44+
record.progress = 0
45+
46+
def _get_data(self):
47+
raise NotImplementedError
48+
49+
def split_in_chunk(self):
50+
"""Split Group into Chunk"""
51+
# purge chunk in case of retring a job
52+
self.item_ids.unlink()
53+
try:
54+
data = self._get_data()
55+
with self.work_on(self._name) as work:
56+
splitter = work.component(usage=self.data_format)
57+
splitter.run(data)
58+
except Exception as e:
59+
self.state = "failed"
60+
self.info = _("Failed to create the chunk: %s") % e
61+
return True
62+
63+
def set_done(self):
64+
for record in self:
65+
if record.nbr_error:
66+
record.state = "failed"
67+
else:
68+
record.state = "done"
69+
record.date_done = fields.Datetime.now()

pattern_import_export/models/pattern_chunk.py renamed to chunk_processing/models/chunk_item.py

+24-28
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,15 @@
55
from odoo import fields, models
66

77

8-
class PatternChunk(models.Model):
9-
_name = "pattern.chunk"
10-
_description = "Pattern Chunk"
8+
class ChunkItem(models.Model):
9+
_inherit = "collection.base"
10+
_name = "chunk.item"
11+
_description = "Chunk Item"
1112
_order = "start_idx"
1213
_rec_name = "start_idx"
1314

14-
pattern_file_id = fields.Many2one(
15-
"pattern.file", "Pattern File", required=True, ondelete="cascade"
15+
group_id = fields.Many2one(
16+
"chunk.group", "Chunk Group", required=True, ondelete="cascade"
1617
)
1718
start_idx = fields.Integer()
1819
stop_idx = fields.Integer()
@@ -32,33 +33,31 @@ class PatternChunk(models.Model):
3233
]
3334
)
3435

35-
def run_import(self):
36-
model = self.pattern_file_id.pattern_config_id.model_id.model
37-
res = (
38-
self.with_context(pattern_config={"model": model, "record_ids": []})
39-
.env[model]
40-
.load([], self.data)
41-
)
42-
self.write(self._prepare_chunk_result(res))
43-
config = self.pattern_file_id.pattern_config_id
44-
priority = config.job_priority
45-
if not config.process_multi:
36+
def manual_run(self):
37+
""" Run the import without try/except, easier for debug """
38+
return self._run()
39+
40+
def _run(self):
41+
with self.work_on(self.group_id.apply_on_model) as work:
42+
processor = work.component(usage=self.group_id.usage)
43+
processor.run()
44+
if not self.group_id.process_multi:
4645
next_chunk = self.get_next_chunk()
4746
if next_chunk:
48-
next_chunk.with_delay(priority=priority).run()
47+
next_chunk.with_delay(priority=self.group_id.job_priority).run()
4948
else:
5049
self.with_delay(priority=5).check_last()
5150
else:
5251
self.with_delay(priority=5).check_last()
5352

5453
def run(self):
55-
"""Process Import of Pattern Chunk"""
54+
"""Process Chunk Item in a savepoint"""
5655
cr = self.env.cr
5756
try:
5857
self.state = "started"
5958
cr.commit() # pylint: disable=invalid-commit
6059
with cr.savepoint():
61-
self.run_import()
60+
self._run()
6261
except Exception as e:
6362
self.write(
6463
{
@@ -70,6 +69,7 @@ def run(self):
7069
self.with_delay().check_last()
7170
return "OK"
7271

72+
# TODO move this in pattern-import
7373
def _prepare_chunk_result(self, res):
7474
# TODO rework this part and add specific test case
7575
nbr_error = len(res["messages"])
@@ -98,23 +98,19 @@ def _prepare_chunk_result(self, res):
9898
}
9999

100100
def get_next_chunk(self):
101-
return self.search(
102-
[
103-
("pattern_file_id", "=", self.pattern_file_id.id),
104-
("state", "=", "pending"),
105-
],
106-
limit=1,
101+
return fields.first(
102+
self.group_id.item_ids.filtered(lambda s: s.state == "pending")
107103
)
108104

109105
def is_last_job(self):
110-
return not self.pattern_file_id.chunk_ids.filtered(
106+
return not self.group_id.item_ids.filtered(
111107
lambda s: s.state in ("pending", "started")
112108
)
113109

114110
def check_last(self):
115111
"""Check if all chunk have been processed"""
116112
if self.is_last_job():
117-
self.pattern_file_id.set_import_done()
118-
return "Pattern file is done"
113+
self.group_id.set_done()
114+
return "Chunk group is done"
119115
else:
120116
return "There is still some running chunk"

pattern_import_export/views/pattern_chunk.xml renamed to chunk_processing/views/chunk_item_view.xml

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
<?xml version="1.0" encoding="UTF-8" ?>
22
<odoo>
33

4-
<record id="pattern_chunk_view_tree" model="ir.ui.view">
5-
<field name="model">pattern.chunk</field>
4+
<record id="chunk_item_view_tree" model="ir.ui.view">
5+
<field name="model">chunk.item</field>
66
<field name="arch" type="xml">
77
<tree string="Chunk">
88
<field name="start_idx" />
@@ -14,12 +14,12 @@
1414
</field>
1515
</record>
1616

17-
<record id="pattern_chunk_view_form" model="ir.ui.view">
18-
<field name="model">pattern.chunk</field>
17+
<record id="chunk_item_view_form" model="ir.ui.view">
18+
<field name="model">chunk.item</field>
1919
<field name="arch" type="xml">
2020
<form string="Chunk">
2121
<header>
22-
<button name="run_import" type="object" string="Force manual run" />
22+
<button name="manual_run" type="object" string="Force manual run" />
2323
</header>
2424
<sheet>
2525
<group>

pattern_import_export/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
from . import models
22
from . import wizard
3+
from . import components

pattern_import_export/__manifest__.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
"web_notify",
1616
"base_sparse_field_list_support",
1717
"base_sparse_field",
18+
"chunk_processing",
1819
],
1920
"data": [
2021
"security/pattern_security.xml",
@@ -23,11 +24,10 @@
2324
"wizard/import_pattern_wizard.xml",
2425
"views/pattern_config.xml",
2526
"views/pattern_file.xml",
26-
"views/pattern_chunk.xml",
2727
"views/menuitems.xml",
2828
"views/templates.xml",
2929
"data/queue_job_channel_data.xml",
30-
"data/queue_job_function_data.xml",
30+
# "data/queue_job_function_data.xml",
3131
],
3232
"demo": ["demo/demo.xml"],
3333
"installable": True,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from . import processor
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# Copyright 2021 Akretion (https://www.akretion.com).
2+
# @author Sébastien BEAU <[email protected]>
3+
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
4+
5+
from odoo.addons.component.core import Component
6+
7+
8+
class ChunkProcessorPattern(Component):
9+
_inherit = "chunk.processor"
10+
_name = "chunk.processor.pattern"
11+
_usage = "pattern.import"
12+
13+
def run(self):
14+
model = self.collection.group_id.apply_on_model
15+
res = (
16+
self.env[model]
17+
.with_context(pattern_config={"model": model, "record_ids": []})
18+
.load([], self.collection.data)
19+
)
20+
self.collection.write(self._prepare_chunk_result(res))
21+
22+
def _prepare_chunk_result(self, res):
23+
# TODO rework this part and add specific test case
24+
nbr_error = len(res["messages"])
25+
nbr_success = max(self.collection.nbr_item - nbr_error, 0)
26+
27+
# case where error are not return and record are not imported
28+
nbr_imported = len(res.get("ids") or [])
29+
if nbr_success > nbr_imported:
30+
nbr_success = nbr_imported
31+
nbr_error = self.collection.nbr_item - nbr_imported
32+
33+
if nbr_error:
34+
state = "failed"
35+
else:
36+
state = "done"
37+
result = self.env["ir.qweb"]._render(
38+
"pattern_import_export.format_message", res
39+
)
40+
return {
41+
"record_ids": res.get("ids"),
42+
"messages": res.get("messages"),
43+
"result_info": result,
44+
"state": state,
45+
"nbr_success": nbr_success,
46+
"nbr_error": nbr_error,
47+
}

0 commit comments

Comments
 (0)