diff --git a/shared_tasks/dynalab/README.md b/shared_tasks/dynalab/README.md index f8b4d3a..7c2ccee 100644 --- a/shared_tasks/dynalab/README.md +++ b/shared_tasks/dynalab/README.md @@ -86,19 +86,44 @@ Note that the "signed" field will be obtained by calling `self.taskIO.sign_respo Also note that you can edit the [requirements.txt](./requirements.txt) file, based your model's specific dependencies. -Once you've implemented the handler you'll need to test it locally. +### Project installation and dependencies -First install `dynalab` using instructions from their [repo](https://github.com/facebookresearch/dynalab#installation). +#### 1. Create & activate virtual venv +``` +python3 -m venv venv +source venv/bin/activate +``` +#### 2. Install Dynalab, fairseq & sentencepiece +``` +git clone https://github.com/facebookresearch/dynalab.git +cd dynalab +pip install -e . +pip install sentencepiece +pip install fairseq +``` +#### 3. Clone flores repo & put your model files inside the shared_task/dynalab folder +you can get your model files from (https://dl.fbaipublicfiles.com/flores101/pretrained_models/flores101_mm100_175M.tar.gz) and extract the contents of the zip and put it in "shared_task/dynalab" +``` +git clone https://github.com/facebookresearch/flores +cd shared_task/dynalab +``` +Once you've implemented the handler you'll need to test it locally. The simplest test is to run `python handler.py`. You'll need to update the `local_test` function to use the task you want. Then you can move to running more involved tests using Dynalab. Afterwards, from this directory run: -`dynalab-cli init -n ` +``` +dynalab-cli init -n --model-checkpoint -t --model-files + +example: + +dynalab-cli init -n asimafrican --model-checkpoint model.pt -t flores_african --model-files "model_generation.json","dict.txt","sentencepiece.bpe.model" +``` Note that the model name needs to be lower-kebab-case. -Chose the track you want to apply to: "flores_small1", "flores_small2" or "flores_full". +Chose the track you want to apply to: "flores_african" Note that the input format is same for all the tracks. Then follow the prompt instruction and point to your model path, handler path ... diff --git a/shared_tasks/dynalab/handler.py b/shared_tasks/dynalab/handler.py index b299904..eda6d38 100644 --- a/shared_tasks/dynalab/handler.py +++ b/shared_tasks/dynalab/handler.py @@ -1,28 +1,31 @@ # Copyright (c) Facebook, Inc. and its affiliates. +import collections import json import logging -import time import os +import re +import sys +import time from pathlib import Path +from typing import NamedTuple +sys.path.append("/home/model-server/code/fairseq") import fairseq.checkpoint_utils import sentencepiece import torch -from typing import NamedTuple from dynalab.handler.base_handler import BaseDynaHandler -from dynalab.tasks.flores_small1 import TaskIO +from dynalab.tasks.task_io import TaskIO +from fairseq.data import data_utils from fairseq.sequence_generator import SequenceGenerator from fairseq.tasks.translation import TranslationConfig, TranslationTask -from fairseq.data import data_utils logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) -# Tell Torchserve to let use do the deserialization +# Tell Torchserve to let us do the deserialization os.environ["TS_DECODE_INPUT_REQUEST"] = "false" - def mapping(languages: str) -> dict: return dict( tuple(pair.split(":")) @@ -42,10 +45,53 @@ def mapping(languages: str) -> dict: pan:pa,pol:pl,por:pt,pus:ps,ron:ro,rus:ru,slk:sk,slv:sl,sna:sn,snd:sd, som:so,spa:es,srp:sr,swe:sv,swh:sw,tam:ta,tel:te,tgk:tg,tgl:tl,tha:th, tur:tr,ukr:uk,umb:umb,urd:ur,uzb:uz,vie:vi,wol:wo,xho:xh,yor:yo,zho_simp:zh, -zho_trad:zh,zul:zu +zho_trad:zh,zul:zu,fuv:ff """ ) +QUOTES = re.compile(r'"|,,|\'\'|``|‟|“|”') +HYPHEN = re.compile(r" -\s*- ") +STARTQUOTE = re.compile(r'(^|[ ({\[])("|,,|\'\'|``)') +ENDQUOTE = re.compile(r'("|\'\'|‟)($|[ ,.?!:;)}\]])') +NUMERALS = re.compile(r"([\d]+[\d\-\.%\,:]*)") +LATIN = re.compile(r"([a-zA-Z’\'@]+[a-zA-Z’\'@_:\-]*)") +SPACE = re.compile(r"\s+") +PUNCT = re.compile(r"\s([\.)”。])") +PUNCT2 = re.compile(r"([(“])\s") +COMMA1 = re.compile(r"(\D),") +COMMA2 = re.compile(r",(\D)") +# Don't replace multiple dots +DOT = re.compile(r"(? int: @@ -129,7 +177,6 @@ def preprocess_one(self, sample) -> dict: """ preprocess data into a format that the model can do inference on """ - # TODO: this doesn't seem to produce good results. wrong EOS / BOS ? tokens = self.tokenize(sample["sourceText"]) src_token = self.lang_token(sample["sourceLanguage"]) tgt_token = self.lang_token(sample["targetLanguage"]) @@ -189,38 +236,53 @@ def postprocess(self, inference_output, samples: list) -> list: self.vocab.string(self.strip_pad(sentence), "sentencepiece") for sentence in inference_output ] - return [ - # Signing required by dynabench, don't remove. - self.taskIO.sign_response( - {"id": sample["uid"], "translatedText": translation}, - sample, - ) + responses = [ + { + "id": sample["uid"], + "translatedText": translation, + } for translation, sample in zip(translations, samples) ] + # Signing required by dynabench, don't remove. + for response, sample in zip(responses, samples): + self.taskIO.sign_response(response, sample) + return responses + + def accepts(self, sample) -> bool: + return sample["sourceLanguage"] in ISO2M100 and sample["targetLanguage"] in ISO2M100 + + def ignore_sample(self, sample) -> dict: + r = {"id": sample["uid"], "translatedText": ""} + self.taskIO.sign_response(r, sample) + return r _service = Handler() -def deserialize(torchserve_data: list) -> list: +def deserialize(torchserve_data: list) -> tuple: samples = [] - for torchserve_sample in torchserve_data: + sample2batch = {} + for batch_id, torchserve_sample in enumerate(torchserve_data): data = torchserve_sample["body"] # In case torchserve did the deserialization for us. if isinstance(data, dict): + sample2batch[data["uid"]] = batch_id samples.append(data) elif isinstance(data, (bytes, bytearray)): lines = data.decode("utf-8").splitlines() for i, l in enumerate(lines): try: - samples.append(json.loads(l)) + sample = json.loads(l) + sample2batch[sample["uid"]] = batch_id + samples.append(sample) except Exception as e: logging.error(f"Couldn't deserialize line {i}: {l}") logging.exception(e) else: logging.error(f"Unexpected payload: {data}") - return samples + return samples, len(torchserve_data), sample2batch def handle_mini_batch(service, samples): @@ -252,55 +314,105 @@ def handle(torchserve_data, context): return None start_time = time.time() - all_samples = deserialize(torchserve_data) + all_samples, num_batches, sample2batch = deserialize(torchserve_data) n = len(all_samples) logger.info( f"Deserialized a batch of size {n} ({n/(time.time()-start_time):.2f} samples / s)" ) - # Adapt this to your model. The GPU has 16Gb of RAM. - batch_size = 128 + # Adapt this to your model. The GPU has 11Gb of RAM. + batch_size = 96 results = [] samples = [] for i, sample in enumerate(all_samples): + if not _service.accepts(sample): + results.append(_service.ignore_sample(sample)) + continue + samples.append(sample) - if len(samples) < batch_size and i + 1 < n: + if len(samples) < batch_size: continue results.extend(handle_mini_batch(_service, samples)) samples = [] + if len(samples) > 0: + results.extend(handle_mini_batch(_service, samples)) + samples = [] assert len(results) + + return wrap_as_batches(results, num_batches, sample2batch) + + +def wrap_as_batches(results, num_batches, sample2batch): start_time = time.time() - response = "\n".join(json.dumps(r, indent=None, ensure_ascii=False) for r in results) + n = len(sample2batch) + if num_batches == 1: + response = "\n".join( + json.dumps(r, indent=None, ensure_ascii=False) for r in results + ) + logger.info( + f"Serialized a batch of size {n} ({n/(time.time()-start_time):.2f} samples / s)" + ) + return [response] + + batch2results = collections.defaultdict(list) + for result in results: + batch_id = sample2batch[result["id"]] + batch2results[batch_id].append(result) + + responses = [] + for batch_id in sorted(batch2results.keys()): + response = "\n".join( + json.dumps(r, indent=None, ensure_ascii=False) + for r in batch2results[batch_id] + ) + responses.append(response) logger.info( f"Serialized a batch of size {n} ({n/(time.time()-start_time):.2f} samples / s)" ) - return [response] + return responses + + +def mk_sample(text, tgt, i=0): + return { + "uid": f"sample{i}", + "sourceText": text, + "sourceLanguage": "eng", + "targetLanguage": tgt, + } + + +class Context(NamedTuple): + system_properties: dict + manifest: dict + + def _call_handler(self, data): + need_wrap = not isinstance(data, list) + if need_wrap: + data = [data] + bin_data = b"\n".join(json.dumps(d).encode("utf-8") for d in data) + torchserve_data = [{"body": bin_data}] + responses = handle(torchserve_data, self) + parsed_responses = [ + json.loads(l)["translatedText"] for l in responses[0].splitlines() + ] + if need_wrap: + return parsed_responses[0] + else: + return parsed_responses def local_test(): - from dynalab.tasks import flores_small1 - - bin_data = b"\n".join(json.dumps(d).encode("utf-8") for d in flores_small1.data) - torchserve_data = [{"body": bin_data}] - - manifest = {"model": {"serializedFile": "model.pt"}} - system_properties = {"model_dir": ".", "gpu_id": None} - - class Context(NamedTuple): - system_properties: dict - manifest: dict + from dynalab.tasks.task_io import TaskIO + manifest = {"model": {"serializedFile": "checkpoint.pt"}} + system_properties = {"model_dir": ".", "gpu_id": 0} ctx = Context(system_properties, manifest) - batch_responses = handle(torchserve_data, ctx) - print(batch_responses) - - single_responses = [ - handle([{"body": json.dumps(d).encode("utf-8")}], ctx)[0] - for d in flores_small1.data - ] - assert batch_responses == ["\n".join(single_responses)] + for k, testcase in globals().items(): + if k.startswith("test_") and callable(testcase): + logger.info(f"[TESTCASE] {k}") + testcase(ctx) if __name__ == "__main__": - local_test() + local_test() \ No newline at end of file diff --git a/shared_tasks/dynalab/model_generation.json b/shared_tasks/dynalab/model_generation.json index 2882127..041a012 100644 --- a/shared_tasks/dynalab/model_generation.json +++ b/shared_tasks/dynalab/model_generation.json @@ -1,7 +1,12 @@ { "dummy": false, - "beam_size": 1, - "max_len_a": 1.3, - "max_len_b": 5, - "min_len": 5 + "generation": { + "beam_size": 1, + "max_len_a": 3, + "max_len_b": 5, + "min_len": 1, + "unk_penalty": 100.0 + }, + "sentencepiece": "wiki.spm" } + diff --git a/shared_tasks/dynalab/requirements.txt b/shared_tasks/dynalab/requirements.txt index 67cab07..65966c9 100644 --- a/shared_tasks/dynalab/requirements.txt +++ b/shared_tasks/dynalab/requirements.txt @@ -1,3 +1,3 @@ sentencepiece -# Replace this by 1.0.0 once it's out -git+git://github.com/pytorch/fairseq.git@1305008e#egg=fairseq +fairseq==0.12.2 + diff --git a/shared_tasks/dynalab/setup_config.json b/shared_tasks/dynalab/setup_config.json index 4809edf..1205d24 100644 --- a/shared_tasks/dynalab/setup_config.json +++ b/shared_tasks/dynalab/setup_config.json @@ -1,5 +1,5 @@ { - "task": "flores_small1", + "task": "flores_african", "checkpoint": "model.pt", "handler": "handler.py", "requirements": true,