diff --git a/README.md b/README.md index bb388702..990138a1 100644 --- a/README.md +++ b/README.md @@ -74,47 +74,3 @@ See details about other script features using help arg: ```bash python3 compare_models.py --help ``` - -# README from the original github repo -Due to the reason that the original repo do not allows to work with any other LLM from the box excluding ones from the paper it was forked and strongly refactored. But with regards to the authors of the initial work the text from their README is stayed here. - -## LMentry - -This repository contains the LMentry benchmark from [LMentry: A Language Model Benchmark of Elementary Language Tasks](https://arxiv.org/pdf/2211.02069.pdf), as well as the code to evaluate it. - -For any questions, feel free to open a GitHub issue or to contact us at avia.efrat@gmail.com :blush: - -### Getting the Data -Simply clone the repo: -```shell -git clone https://github.com/aviaefrat/lmentry.git -``` -The data is in the `data` directory. - -### Generating Predictions -We provide functions for generating predictions with Hugging Face and OpenAI models (see below), but you can generate predictions in any method of your choosing. - -For Hugging Face and OpenAI models, you can use the -`generate_all_hf_predictions` and -`generate_all_openai_predictions` functions from `predict.py`. These are what we used in our experiments. - -### Evaluation - -The easiest and recommended way is to use `evalutate.py`: -```shell -python -m lmentry.evaluate -``` -_Don't forget to activate the lmentry environment (created from `environment.yml`) beforehand._ -Using the `--num-procs=N` optional argument will score the predictions much faster. -`evalutate.py` will also automatically create files analyzing the results in a separate `results` dir. - -To use `evalutate.py`, the predictions must follow the same structure of [lmentry_predictions.zip](https://drive.google.com/file/d/1Ex1fde_PEzhIU5ctGkOJsacaGNnQeqke/view?usp=sharing) (if you used our functions from `predict.py`, your predictions already follow this structure): -1. The top-level directory should be named `predictions`. -2. `predictions` needs to contain exactly 41 directories, named after the 41 files in `data` (the 25 task names + the 16 files for the argument content robustness). -3. Each of the 41 task predictions directories should contain a prediction file for each model you want to evaluate. For example, to evaluate the predictions of a model named `my-model`, each of the 41 directories should contain a file named `my-model.json` with the model's predictions for this task. -4. Each predictions file should contain values in the form `"": {"prediction": },` where the `id`s correspond to those in the task's file in `data`. - -### Reproducing the Results from the Paper -1. Clone the repository. -2. Unzip `lmentry_predictions.zip` into the top-level lmentry directory. -3. run `evaluate.py` (preferably with a not-very-small value for `--num-procs`, as there are 656 files to score...) diff --git a/README_OLD.md b/README_OLD.md new file mode 100644 index 00000000..988083a8 --- /dev/null +++ b/README_OLD.md @@ -0,0 +1,42 @@ +Due to the reason that the original repo do not allows to work with any other LLM and tasks from the box excluding ones from the paper it was forked and strongly refactored. But with regards to the authors of the initial work the text from their README is stayed here. + +# LMentry + +This repository contains the LMentry benchmark from [LMentry: A Language Model Benchmark of Elementary Language Tasks](https://arxiv.org/pdf/2211.02069.pdf), as well as the code to evaluate it. + +For any questions, feel free to open a GitHub issue or to contact us at avia.efrat@gmail.com :blush: + +## Getting the Data +Simply clone the repo: +```shell +git clone https://github.com/aviaefrat/lmentry.git +``` +The data is in the `data` directory. + +## Generating Predictions +We provide functions for generating predictions with Hugging Face and OpenAI models (see below), but you can generate predictions in any method of your choosing. + +For Hugging Face and OpenAI models, you can use the +`generate_all_hf_predictions` and +`generate_all_openai_predictions` functions from `predict.py`. These are what we used in our experiments. + +## Evaluation + +The easiest and recommended way is to use `evalutate.py`: +```shell +python -m lmentry.evaluate +``` +_Don't forget to activate the lmentry environment (created from `environment.yml`) beforehand._ +Using the `--num-procs=N` optional argument will score the predictions much faster. +`evalutate.py` will also automatically create files analyzing the results in a separate `results` dir. + +To use `evalutate.py`, the predictions must follow the same structure of [lmentry_predictions.zip](https://drive.google.com/file/d/1Ex1fde_PEzhIU5ctGkOJsacaGNnQeqke/view?usp=sharing) (if you used our functions from `predict.py`, your predictions already follow this structure): +1. The top-level directory should be named `predictions`. +2. `predictions` needs to contain exactly 41 directories, named after the 41 files in `data` (the 25 task names + the 16 files for the argument content robustness). +3. Each of the 41 task predictions directories should contain a prediction file for each model you want to evaluate. For example, to evaluate the predictions of a model named `my-model`, each of the 41 directories should contain a file named `my-model.json` with the model's predictions for this task. +4. Each predictions file should contain values in the form `"": {"prediction": },` where the `id`s correspond to those in the task's file in `data`. + +## Reproducing the Results from the Paper +1. Clone the repository. +2. Unzip `lmentry_predictions.zip` into the top-level lmentry directory. +3. run `evaluate.py` (preferably with a not-very-small value for `--num-procs`, as there are 656 files to score...) diff --git a/compare_models.py b/compare_models.py index dbb461a3..713b590c 100644 --- a/compare_models.py +++ b/compare_models.py @@ -4,7 +4,7 @@ from lmentry.constants import PREDICTIONS_ROOT_DIR, TASKS_DATA_DIR, RESULTS_DIR, DEFAULT_MAX_LENGTH from tasks.task_utils import get_tasks_names, task_groups, all_tasks -from lmentry.predict import generate_all_hf_predictions +from lmentry.predict import PredictorFactory from lmentry.analysis.accuracy import flexible_scoring from lmentry.analysis.comparison import create_per_task_accuracy_comparison_csv from lmentry.model_manager import get_short_model_names @@ -18,7 +18,7 @@ def parse_arguments(): parser.add_argument("-r", "--ref_model_name", type=str, default="vicuna-7b-v1-3-q0f16", help="Name of reference model. It is assumed that the model is original, " "uses high-precision data type and has better accuracy") - parser.add_argument('-p', '--probe_model_names', nargs="+", type=str, default="vicuna-7b-v1-3-q4f16_0", + parser.add_argument('-p', '--probe_model_names', nargs="+", type=str, default=["vicuna-7b-v1-3-q4f16_0"], help=f"Names of probe models. If the number of the probe models is bigger than one " "it iteratively compares the reference model with each from the list.") parser.add_argument('-t', '--task_names', nargs="+", type=str, default=get_tasks_names("7b"), @@ -26,6 +26,8 @@ def parse_arguments(): f"Task set names should be from the list: {task_groups.keys()}. " f"Task names should be from the list: {all_tasks}. " "It tries to analyze 7b-model sensetive task set by default") + parser.add_argument("-pt", "--predictor_type", type=str, default="hf", + help=f"Type of predictor, can be chosen from the list: {PredictorFactory.predictors_map.keys()}") parser.add_argument('-d', '--device', type=str, default="cuda", help="Device name. It is needed and used by mlc model only during predictions") parser.add_argument('-b', '--batch_size', type=int, default=100, @@ -42,10 +44,14 @@ def parse_arguments(): type=int, help="The number of processes to use when scoring the predictions. " "Can be up to the number of models you want to evaluate * 41.") - parser.add_argument("-f", "--forced_scoring", action="store_true", default=False, - help="If scoring has been done for specified task it skips it. This flag allows to redo ready scoring") + parser.add_argument('-fp', '--force_predict', action="store_true", default=False, + help="Whether to force regenerate predictions.") + parser.add_argument("-fs", "--force_scoring", action="store_true", default=False, + help="If scoring has been done for specified task it will be skiped. This flag allows to redo ready scoring") parser.add_argument("-c", "--certainty", action="store_true", default=False, help="Conservative accuracy evaluation. The answer is considered correct only if it is absolutely certain") + parser.add_argument('-uv', '--use_vllm', action='store_true', default=False, + help="Whether to use vLLM inference.") return parser.parse_args() @@ -61,41 +67,55 @@ def main(): args = parse_arguments() task_names = get_tasks_names(args.task_names) - for probe_model_name in tqdm(args.probe_model_names, desc="Models comparison"): - model_names = get_short_model_names([args.ref_model_name, probe_model_name]) - print(f"Models {model_names[0]} and {model_names[1]} are compared") - - # Predict specified tasks for given models - # Reference model - logging.info(f"Prediction for {model_names[0]} model starts") - generate_all_hf_predictions( - task_names=task_names, - model_name=args.ref_model_name, + # Init predictor + predictor = PredictorFactory.get_predictor( + name=args.predictor_type, max_length=args.max_length, batch_size=args.batch_size, - device=args.device, samples_num=args.samples_num, - ) - logging.info(f"Prediction for {model_names[0]} model finished") - # Probe_model - logging.info(f"Prediction for {model_names[1]} model starts") - generate_all_hf_predictions( + ) + + # Predict specified tasks for reference model + logging.info(f"Prediction for {args.ref_model_name} model starts") + predictor.generate( + task_names=task_names, + model_name=args.ref_model_name, + device=args.device, + use_vllm=args.use_vllm, + force_predict=args.force_predict, + ) + logging.info(f"Prediction for {args.ref_model_name} model finished") + + # Score reference model + flexible_scoring( + task_names=task_names, + model_names=[args.ref_model_name], + num_processes=args.num_procs, + forced_scoring=args.force_predict or args.force_scoring, + ) + + for probe_model_name in tqdm(args.probe_model_names, desc="Models comparison"): + print(f"Models {args.ref_model_name} and {probe_model_name} are compared") + + # Predict specified tasks for probe model + logging.info(f"Prediction for {probe_model_name} model starts") + predictor.generate( task_names=task_names, model_name=probe_model_name, - max_length=args.max_length, - batch_size=args.batch_size, device=args.device, - samples_num=args.samples_num, + use_vllm=args.use_vllm, + force_predict=args.force_predict, ) - logging.info(f"Prediction for {model_names[1]} model finished") + logging.info(f"Prediction for {probe_model_name} model finished") flexible_scoring( task_names=task_names, - model_names=model_names, + model_names=[probe_model_name], num_processes=args.num_procs, forced_scoring=args.forced_scoring, ) + model_names = get_short_model_names([args.ref_model_name, probe_model_name]) create_per_task_accuracy_comparison_csv(model_names=model_names, task_names=task_names, certainty=args.certainty) diff --git a/lmentry/input_preprocessor.py b/lmentry/input_preprocessor.py new file mode 100644 index 00000000..f21dd376 --- /dev/null +++ b/lmentry/input_preprocessor.py @@ -0,0 +1,7 @@ +class PromptPreprocessor: + + def __init__(self) -> None: + pass + + def preprocess(self, raw_input_prompts): + return raw_input_prompts diff --git a/lmentry/output_postprocessor.py b/lmentry/output_postprocessor.py new file mode 100644 index 00000000..6b9fd586 --- /dev/null +++ b/lmentry/output_postprocessor.py @@ -0,0 +1,7 @@ +class PredictionPostprocessor: + + def __init__(self) -> None: + pass + + def postprocess(self, raw_output_prompts): + return raw_output_prompts \ No newline at end of file diff --git a/lmentry/predict.py b/lmentry/predict.py index 23db1cfb..f33ff414 100644 --- a/lmentry/predict.py +++ b/lmentry/predict.py @@ -11,53 +11,112 @@ from tasks.task_utils import all_tasks, get_task from lmentry.model_manager import ModelManager +from lmentry.input_preprocessor import PromptPreprocessor +from lmentry.output_postprocessor import PredictionPostprocessor logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%Y/%m/%d %H:%M:%S', level=logging.INFO) -def _batcher(sequence, batch_size): +class PredictorBase(): + def __init__(self, + max_length: int=50, + batch_size: int=200, + samples_num: int=None, + data_path=None, + output_path=None): + self.max_length = max_length + self.batch_size = batch_size + self.samples_num = samples_num + self.data_path = data_path + self.output_path = output_path + + @staticmethod + def _batcher(sequence, batch_size): for i in range(0, len(sequence), batch_size): - yield sequence[i:i + batch_size] + yield sequence[i:i + batch_size] - -def _ms_since_epoch(): + @staticmethod + def _ms_since_epoch(): return time.perf_counter_ns() // 1000000 - -def get_part_from(examples: dict, samples_num: int=None): + def get_part_from(self, examples: dict): ex_num = len(examples) - if samples_num and samples_num < ex_num: - import random - cut_idx = sorted(random.sample(range(1, ex_num + 1), samples_num)) - cut_examples = dict() - - for idx in cut_idx: - cut_examples[str(idx)] = examples[str(idx)] - return cut_examples + if self.samples_num and self.samples_num < ex_num: + import random + cut_idx = sorted(random.sample(range(1, ex_num + 1), self.samples_num)) + cut_examples = dict() + + for idx in cut_idx: + cut_examples[str(idx)] = examples[str(idx)] + return cut_examples else: - return examples + return examples + def generate_task(self): + raise NotImplementedError("Generate task method is not implemented in base class") -def generate_task_hf_predictions(task_name, - manager: ModelManager = None, - model_name: str="", - max_length: int=50, - batch_size: int=200, - device: str="cuda", - samples_num: int=None, - use_vllm: bool=True, - data_path=None, - output_path=None): - task = get_task(task_name) + def save_predictions(self, + output_path, + examples, + preproc_input_prompts, + postproc_predictions) -> None: + predictions_data = dict() + for id_, input_, prediction in zip(examples, preproc_input_prompts, postproc_predictions): + predictions_data[id_] = {"input": input_, "prediction": prediction} + + with open(output_path, "w") as f_predictions: + json.dump(predictions_data, f_predictions, indent=2) + + def generate(self, + task_names: list[str] = None, + model_name: str = "", + device: str="cuda", + use_vllm: bool=False, + force_predict: bool=False): + task_names = task_names or all_tasks + # TODO(vvchernov): remove max_length + manager = ModelManager(model_name, device, self.max_length, use_vllm) + if manager.type == "mlc": + self.batch_size = 1 + for task_name in tqdm(task_names, desc="Predict tasks"): + task = get_task(task_name) + + # check task and skip it if it has been done + # TODO(vvchernov): remove samples num? + if force_predict or not task.is_predicted(manager.model_name, self.samples_num): + self.generate_task(task, manager, model_name, device, use_vllm) + + +class HFTaskPredictor(PredictorBase): + def __init__(self, + max_length: int=50, + batch_size: int=200, + samples_num: int=None, + data_path=None, + output_path=None): + super().__init__(self, + max_length, + batch_size, + samples_num, + data_path, + output_path) + + def generate_task(self, + task_name_or_obj, + manager: ModelManager = None, + model_name: str="", + device: str="cuda", + use_vllm: bool=True): + task = get_task(task_name_or_obj) if isinstance(task_name_or_obj, str) else task_name_or_obj if not model_name and not manager: - raise ValueError("must provide either `model_name` or `model manager`") + raise ValueError("must provide either `model_name` or `model manager`") if not manager: - manager = ModelManager(model_name, device) + manager = ModelManager(model_name, device) if manager.type == "mlc": - batch_size = 1 + self.batch_size = 1 - logging.info(f"generating predictions for task \"{task_name}\" with model \"{manager.predictor_name}\"") + logging.info(f"generating predictions for task \"{task.name}\" with model \"{manager.predictor_name}\"") # initialize tokenizer and model tokenizer = manager.get_tokenizer() @@ -68,99 +127,91 @@ def generate_task_hf_predictions(task_name, model = manager.model # load task data - examples = task.get_data(data_path) - examples = get_part_from(examples, samples_num) - # get the inputs from the task data - string_inputs = [example["input"] for example in examples.values()] + examples = task.get_data(self.data_path) + examples = self.get_part_from(examples) + + # get the initial inputs from the task data + raw_input_prompts = [example["input"] for example in examples.values()] + + # Preprocess input prompts if need + preprocessor = PromptPreprocessor() + preproc_input_prompts = preprocessor.preprocess(raw_input_prompts) # generate predictions - predictions: list[str] = [] + raw_predictions: list[str] = [] if use_vllm: - from vllm import SamplingParams - sampling_params = SamplingParams( - n=1, - temperature=0.8, - top_p=0.95, - use_beam_search=False, - ignore_eos=False, - max_tokens=100, - ) - - for batch_of_strings in tqdm(_batcher(string_inputs, batch_size), desc="Predict batch of requests"): - outputs = model.generate(batch_of_strings, sampling_params) - predictions.extend(outputs) + from vllm import SamplingParams + sampling_params = SamplingParams( + n=1, + temperature=0.8, + top_p=0.95, + use_beam_search=False, + ignore_eos=False, + max_tokens=100, + ) + + for batch_of_strings in tqdm(self._batcher(preproc_input_prompts, batch_size), desc="Predict batch of requests"): + outputs = model.generate(batch_of_strings, sampling_params) + raw_predictions.extend(outputs) else: - for batch_of_strings in tqdm(_batcher(string_inputs, batch_size), desc="Predict batch of requests"): - batched_encoding = tokenizer(batch_of_strings, padding="longest", return_tensors="pt") - batched_encoding = batched_encoding.to(manager.device) - tensor_inputs = batched_encoding["input_ids"] - prompt_len = tensor_inputs.shape[1] - tensor_outputs = model.generate(tensor_inputs, max_length=max_length + prompt_len) - outputs = tokenizer.batch_decode(tensor_outputs, skip_special_tokens=True) - predictions.extend(outputs) + for batch_of_strings in tqdm(self._batcher(preproc_input_prompts, batch_size), desc="Predict batch of requests"): + batched_encoding = tokenizer(batch_of_strings, padding="longest", return_tensors="pt") + batched_encoding = batched_encoding.to(manager.device) + tensor_inputs = batched_encoding["input_ids"] + prompt_len = tensor_inputs.shape[1] + tensor_outputs = model.generate(tensor_inputs, max_length=self.max_length + prompt_len) + outputs = tokenizer.batch_decode(tensor_outputs, skip_special_tokens=True) + raw_predictions.extend(outputs) + + # Postprocess output predictions if need + postprocessor = PredictionPostprocessor() + postproc_predictions = postprocessor.postprocess(raw_predictions) # save the predictions - predictions_data = dict() - for id_, input_, prediction in zip(examples, string_inputs, predictions): - predictions_data[id_] = {"input": input_, "prediction": prediction} - + # TODO(vvchernov): clean path defenition if '/' in manager.model_name: - manager.model_name = manager.short_name - output_path = output_path or task.predictions_dir.joinpath(manager.model_name).with_suffix(".vllm.json" if use_vllm else ".json") - - with open(output_path, "w") as f_predictions: - json.dump(predictions_data, f_predictions, indent=2) - - -def generate_all_hf_predictions(task_names: list[str] = None, model_name: str = "", - max_length=50, batch_size=200, device: str="cuda", - samples_num: int=None, use_vllm: bool=False, force_predict: bool=False): - task_names = task_names or all_tasks - manager = ModelManager(model_name, device, max_length, use_vllm) - if manager.type == "mlc": - batch_size = 1 - for task_name in tqdm(task_names, desc="Predict tasks"): - # check task and skip it if it has been done - task = get_task(task_name) - - if not force_predict: - # TODO(vvchernov): add advanced postfix (like "_vllm_b1000") - output_file = task.predictions_dir.joinpath(manager.model_name).with_suffix(".vllm.json" if use_vllm else ".json") - - if output_file.exists(): - with open(output_file) as task_json: - task_config = json.load(task_json) - if bool(task_config): - # TODO(vvchernov): it is excess check or we need discuss scenario for it - if samples_num and samples_num <= len(task_config): - logging.info(f"Task {task.name} was skipped due to it was done before. ({len(task_config)} generated vs. {samples_num} requested)\nUse '--force_predict' to force predictions generation.") - continue - - generate_task_hf_predictions(task_name, manager, model_name, max_length, batch_size, device, samples_num, use_vllm) - - -# todo make the saving of the metadata optional (with a default yes as we do it ourselves) -def generate_task_openai_predictions(task_name: str, model_name: str = "", max_tokens: int = -1, - data_path=None, output_path: Path = None, - overwrite_existing_predictions=False, - min_ms_between_api_calls: int = 20, - log_progress_every_n_examples: int = 100, - save_every_n_examples: int = 300, - org_name: str = ""): - task = get_task(task_name) + manager.model_name = manager.short_name + output_path = output_path or task.predictions_dir.joinpath(manager.model_name).with_suffix("_vllm.json" if use_vllm else ".json") + self.save_predictions(output_path, examples, preproc_input_prompts, postproc_predictions) + + +class OpenAIPredictor(PredictorBase): + def __init__(self, + max_length: int=-1, + batch_size: int=200, + samples_num: int=None, + data_path=None, + output_path=None): + super().__init__(self, + max_length, + batch_size, + samples_num, + data_path, + output_path) + + # todo make the saving of the metadata optional (with a default yes as we do it ourselves) + # TODO(vvchernov): upstream args + def generate_task( + self, + task_name_or_obj, + model_name: str = "", + overwrite_existing_predictions=False, + min_ms_between_api_calls: int = 20, + log_progress_every_n_examples: int = 100, + save_every_n_examples: int = 300, + org_name: str = "" + ): + task = get_task(task_name_or_obj) if isinstance(task_name_or_obj, str) else task_name_or_obj # load task data - data_path = data_path or task.default_data_path - with open(data_path) as f_examples: - data = json.load(f_examples) - # get the inputs from the task data - examples = data["examples"] + examples = task.get_data(self.data_path) + examples = self.get_part_from(examples) if save_every_n_examples > len(examples): - save_every_n_examples = len(examples) + save_every_n_examples = len(examples) - output_path = output_path or task.predictions_dir.joinpath(model_name).with_suffix(".json") + output_path = self.output_path or task.predictions_dir.joinpath(model_name).with_suffix(".json") output_with_metadata_path = output_path.with_stem(f"{output_path.stem}_with_metadata") logging.info(f"generating predictions for {task.name} with OpenAI {model_name}") @@ -169,118 +220,302 @@ def generate_task_openai_predictions(task_name: str, model_name: str = "", max_t # (e.g. if the openai API failed before finishing to generate predictions for all examples) id_to_start_predictions_from = 1 if overwrite_existing_predictions or not output_path.is_file(): - predictions = dict() + predictions = dict() else: - with open(output_with_metadata_path) as preexisting_predictions_f: - # we use `output_with_metadata_path` here and not `output` as in this method - # `predictions` include the metadata. - predictions = json.load(preexisting_predictions_f) - # get the first id we should start to predict from - n_preexisting_predictions = len(predictions) - id_to_start_predictions_from = n_preexisting_predictions + 1 - if 0 < n_preexisting_predictions < len(examples): - logging.info(f"{output_path} already contains the first {n_preexisting_predictions} predictions. starting to generate predictions from id {id_to_start_predictions_from}") - elif n_preexisting_predictions == len(examples): - logging.info(f"{output_path} already contains all {len(examples)} predictions. to overwrite, set overwrite_existing_predictions=True") + with open(output_with_metadata_path) as preexisting_predictions_f: + # we use `output_with_metadata_path` here and not `output` as in this method + # `predictions` include the metadata. + predictions = json.load(preexisting_predictions_f) + # get the first id we should start to predict from + n_preexisting_predictions = len(predictions) + id_to_start_predictions_from = n_preexisting_predictions + 1 + if 0 < n_preexisting_predictions < len(examples): + logging.info(f"{output_path} already contains the first {n_preexisting_predictions} predictions. starting to generate predictions from id {id_to_start_predictions_from}") + elif n_preexisting_predictions == len(examples): + logging.info(f"{output_path} already contains all {len(examples)} predictions. to overwrite, set overwrite_existing_predictions=True") # openai API setup and parameters openai.organization = org_name openai.api_key = os.getenv("OPENAI_API_KEY") parameters = { - "max_tokens": max_tokens, - "top_p": 0, # greedy - "temperature": 1, - "logprobs": 5, # maximal value accorrding to https://beta.openai.com/docs/api-reference/completions/create#completions/create-logprobs, used to be 10... - "model": model_name + "max_tokens": self.max_length, + "top_p": 0, # greedy + "temperature": 1, + "logprobs": 5, # maximal value accorrding to https://beta.openai.com/docs/api-reference/completions/create#completions/create-logprobs, used to be 10... + "model": model_name } - time_of_last_api_call = _ms_since_epoch() + time_of_last_api_call = self._ms_since_epoch() # to save time when running the cheaper models, we'll save every 1000 examples if save_every_n_examples < 1000 and ("curie" in model_name or "babbage" in model_name or "ada" in model_name): - save_every_n_examples = 1000 + save_every_n_examples = 1000 for id_ in range(id_to_start_predictions_from, len(examples) + 1): - id_ = str(id_) - prompt = examples[id_]["input"] - parameters["prompt"] = prompt - - # OpenAI limits us to 3000 calls per minute: - # https://help.openai.com/en/articles/5955598-is-api-usage-subject-to-any-rate-limits - # that is why the default value of min_ms_between_api_calls is 20 - if (cur_time := _ms_since_epoch()) <= time_of_last_api_call + min_ms_between_api_calls: - ms_to_sleep = min_ms_between_api_calls - (cur_time - time_of_last_api_call) - time.sleep(ms_to_sleep / 1000) - time_of_last_api_call = _ms_since_epoch() - - response = openai.Completion.create(**parameters) - - # build output data - predictions[id_] = dict() - predictions[id_]["input"] = prompt - predictions[id_]["prediction"] = response.choices[0].text - - # build output metadata - metadata = dict() - metadata["logprobs"] = response.choices[0]["logprobs"] - finish_reason = response.choices[0]["finish_reason"] - metadata["finish_reason"] = finish_reason - - # From the OpenAI API documentation it's not clear what "index" is, but let's keep it as well - metadata["index"] = response.choices[0]["index"] - - predictions[id_]["metadata"] = metadata - - if int(id_) % log_progress_every_n_examples == 0: - logging.info(f'generated predictions up to id {int(id_)} for {task.name} using OpenAI {model_name}') - if int(id_) % save_every_n_examples == 0: - # todo using jsonl instead of json would save all the rewriting, but I choose to - # keep the io overhead for now in favor of if it ain't broken don't fix it - # save a version of the predictions that contains the prediction metadata - with open(output_with_metadata_path, "w") as f_predictions_with_metadata: - json.dump(predictions, f_predictions_with_metadata, indent=2) - # save the predictions without the metadata - predictions_without_metadata = dict() - for id_ in predictions: - predictions_without_metadata[id_] = dict() - for field_name in predictions[id_]: - if field_name != "metadata": - predictions_without_metadata[id_][field_name] = predictions[id_][field_name] - with open(output_path, "w") as f_predictions: - json.dump(predictions_without_metadata, f_predictions, indent=2) - - logging.info(f'saved predictions up to id {int(id_)} for {task.name} using OpenAI {model_name}') + id_ = str(id_) + prompt = examples[id_]["input"] + parameters["prompt"] = prompt + + # OpenAI limits us to 3000 calls per minute: + # https://help.openai.com/en/articles/5955598-is-api-usage-subject-to-any-rate-limits + # that is why the default value of min_ms_between_api_calls is 20 + if (cur_time := self._ms_since_epoch()) <= time_of_last_api_call + min_ms_between_api_calls: + ms_to_sleep = min_ms_between_api_calls - (cur_time - time_of_last_api_call) + time.sleep(ms_to_sleep / 1000) + time_of_last_api_call = self._ms_since_epoch() + + response = openai.Completion.create(**parameters) + + # build output data + predictions[id_] = dict() + predictions[id_]["input"] = prompt + predictions[id_]["prediction"] = response.choices[0].text + + # build output metadata + metadata = dict() + metadata["logprobs"] = response.choices[0]["logprobs"] + finish_reason = response.choices[0]["finish_reason"] + metadata["finish_reason"] = finish_reason + + # From the OpenAI API documentation it's not clear what "index" is, but let's keep it as well + metadata["index"] = response.choices[0]["index"] + + predictions[id_]["metadata"] = metadata + + if int(id_) % log_progress_every_n_examples == 0: + logging.info(f'generated predictions up to id {int(id_)} for {task.name} using OpenAI {model_name}') + if int(id_) % save_every_n_examples == 0: + # todo using jsonl instead of json would save all the rewriting, but I choose to + # keep the io overhead for now in favor of if it ain't broken don't fix it + # save a version of the predictions that contains the prediction metadata + with open(output_with_metadata_path, "w") as f_predictions_with_metadata: + json.dump(predictions, f_predictions_with_metadata, indent=2) + # save the predictions without the metadata + predictions_without_metadata = dict() + for id_ in predictions: + predictions_without_metadata[id_] = dict() + for field_name in predictions[id_]: + if field_name != "metadata": + predictions_without_metadata[id_][field_name] = predictions[id_][field_name] + with open(output_path, "w") as f_predictions: + json.dump(predictions_without_metadata, f_predictions, indent=2) + + logging.info(f'saved predictions up to id {int(id_)} for {task.name} using OpenAI {model_name}') # save remaining unsaved predictions (if any) n_generated_predictions = len(predictions) - id_to_start_predictions_from + 1 if n_generated_predictions % save_every_n_examples != 0: - with open(output_with_metadata_path, "w") as f_predictions_with_metadata: - json.dump(predictions, f_predictions_with_metadata, indent=2) + with open(output_with_metadata_path, "w") as f_predictions_with_metadata: + json.dump(predictions, f_predictions_with_metadata, indent=2) - for id_ in predictions: - del predictions[id_]["metadata"] - with open(output_path, "w") as f_predictions: - json.dump(predictions, f_predictions, indent=2) + for id_ in predictions: + del predictions[id_]["metadata"] + with open(output_path, "w") as f_predictions: + json.dump(predictions, f_predictions, indent=2) logging.info( - f'finished generating predictions for all {len(examples)} examples of {task.name} using OpenAI {model_name}') - - -def generate_all_openai_predictions(task_names: list[str] = None, model_name: str = "", max_tokens: int = -1, - num_processes: int = 1, - data_path=None, output_path: Path = None, - overwrite_existing_predictions=False, - log_progress_every_n_examples: int = 100, - save_every_n_examples: int = 300, - org_name: str = ""): + f'finished generating predictions for all {len(examples)} examples of {task.name} using OpenAI {model_name}') + + + # TODO(vvchernov): upstream with base class method + def generate( + self, + task_names: list[str] = None, + model_name: str = "", + num_processes: int = 1, + data_path=None, + output_path: Path = None, + overwrite_existing_predictions=False, + log_progress_every_n_examples: int = 100, + save_every_n_examples: int = 300, + org_name: str = "" + ): if task_names is None: - task_names = all_tasks + task_names = all_tasks min_ms_between_api_calls = num_processes * 20 # OpenAI limits us to 3000 calls per minute. - # create arguments for generate_task_openai_predictions: - starargs = zip(task_names, repeat(model_name), repeat(max_tokens), repeat(data_path), repeat(output_path), + # create arguments for generate_task: + starargs = zip(task_names, repeat(model_name), repeat(data_path), repeat(output_path), repeat(overwrite_existing_predictions), repeat(min_ms_between_api_calls), repeat(log_progress_every_n_examples), repeat(save_every_n_examples), repeat(org_name)) with Pool(processes=num_processes) as pool: - pool.starmap(generate_task_openai_predictions, starargs) + pool.starmap(self.generate_task, starargs) + + +class OctoAIPredictor(PredictorBase): + def __init__(self, + max_length: int=256, + batch_size: int=200, + samples_num: int=None, + data_path=None, + output_path=None): + super().__init__(self, + max_length, + batch_size, + samples_num, + data_path, + output_path) + + def call_octoai_inference(self, user_input, model_name): + import requests + # Load environment variables from .env file + #load_dotenv() + + # Get the API key from the environment variables + api_key = OCTOAI_API_KEY#os.getenv("OCTOAI_API_KEY") + + if api_key is None: + raise ValueError("API_KEY not found in the .env file") + + url = "https://llama-2-70b-chat-demo-kk0powt97tmb.octoai.run/v1/chat/completions" + + headers = { + "accept": "text/event-stream", + "authorization": f"Bearer {api_key}", + "content-type": "application/json", + } + + # TODO(vvchernov): model name hard code + model_name = "llama-2-70b-chat" + data = { + "model": model_name, + "messages": [ + { + "role": "assistant", + "content": "Below is an instruction that describes a task. Write a response that appropriately completes the request." + }, + { + "role": "user", + "content": user_input + } + ], + "stream": False, + "max_tokens": 256 + } + + response = requests.post(url, headers=headers, json=data) + + if response.status_code != 200: + print(f"Error: {response.status_code} - {response.text}") + + return response + + def generate_task_parallel(self, id_, examples, model_name, predictions): + id_ = str(id_) + prompt = examples[id_]["input"] + response = self.call_octoai_inference(prompt, model_name) + + predictions[id_] = dict() + predictions[id_]["input"] = prompt + + response = json.loads(response.text) + + predictions[id_]["prediction"] = response['choices'][0]['message']['content'] + print(predictions[id_]) + + # TODO(vvchernov): upstream args + def generate_task(self, + task_name_or_obj, + manager: ModelManager = None, + model_name: str="", + device: str="cuda", + use_vllm: bool=True, + overwrite_existing_predictions=False, + log_progress_every_n_examples: int = 10, + save_every_n_examples: int = 300): + + import concurrent.futures + # TODO(vvchernov): looks like OpenAI pipeline + task = get_task(task_name_or_obj) if isinstance(task_name_or_obj, str) else task_name_or_obj + + # load task data + examples = task.get_data(self.data_path) + examples = self.get_part_from(examples) + + if save_every_n_examples > len(examples): + save_every_n_examples = len(examples) + + output_path = output_path or task.predictions_dir.joinpath(model_name).with_suffix(".json") + output_with_metadata_path = output_path.with_stem(f"{output_path.stem}_with_metadata") + + logging.info(f"generating predictions for {task.name} with OctoAI {model_name}") + + # check if we already have some predictions + # (e.g. if the openai API failed before finishing to generate predictions for all examples) + id_to_start_predictions_from = 1 + if overwrite_existing_predictions or not output_path.is_file(): + predictions = dict() + else: + with open(output_with_metadata_path) as preexisting_predictions_f: + # we use `output_with_metadata_path` here and not `output` as in this method + # `predictions` include the metadata. + predictions = json.load(preexisting_predictions_f) + # get the first id we should start to predict from + n_preexisting_predictions = len(predictions) + id_to_start_predictions_from = n_preexisting_predictions + 1 + if 0 < n_preexisting_predictions < len(examples): + logging.info(f"{output_path} already contains the first {n_preexisting_predictions} predictions. starting to generate predictions from id {id_to_start_predictions_from}") + elif n_preexisting_predictions == len(examples): + logging.info(f"{output_path} already contains all {len(examples)} predictions. to overwrite, set overwrite_existing_predictions=True") + + with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: + futures = [] + for id_ in range(id_to_start_predictions_from, 150): #len(examples) + 1): + futures.append(executor.submit(self.generate_task_parallel, id_, examples, model_name, predictions)) + + for future in concurrent.futures.as_completed(futures): + try: + future.result() + except Exception as exc: + logging.error(f"Error generating predictions: {exc}") + + if int(id_) % log_progress_every_n_examples == 0: + logging.info(f'generated predictions up to id {int(id_)} for {task.name} using OpenAI {model_name}') + if int(id_) % save_every_n_examples == 0: + # todo using jsonl instead of json would save all the rewriting, but I choose to + # keep the io overhead for now in favor of if it ain't broken don't fix it + # save a version of the predictions that contains the prediction metadata + with open(output_with_metadata_path, "w") as f_predictions_with_metadata: + json.dump(predictions, f_predictions_with_metadata, indent=2) + # save the predictions without the metadata + predictions_without_metadata = dict() + for id_ in predictions: + predictions_without_metadata[id_] = dict() + for field_name in predictions[id_]: + if field_name != "metadata": + predictions_without_metadata[id_][field_name] = predictions[id_][field_name] + with open(output_path, "w") as f_predictions: + json.dump(predictions_without_metadata, f_predictions, indent=2) + + logging.info(f'saved predictions up to id {int(id_)} for {task.name} using OpenAI {model_name}') + + # save remaining unsaved predictions (if any) + n_generated_predictions = len(predictions) - id_to_start_predictions_from + 1 + if n_generated_predictions % save_every_n_examples != 0: + with open(output_with_metadata_path, "w") as f_predictions_with_metadata: + json.dump(predictions, f_predictions_with_metadata, indent=2) + with open(output_path, "w") as f_predictions: + json.dump(predictions, f_predictions, indent=2) + + logging.info( + f'finished generating predictions for all {len(examples)} examples of {task.name} using OpenAI {model_name}') + + return + +class PredictorFactory(): + predictors_map = { + "hf": HFTaskPredictor, + "openai": OpenAIPredictor, + "octoai": OctoAIPredictor, + } + + def __init__(self) -> None: + pass + + @staticmethod + def get_predictor(name: str, **kwargs): + if name in PredictorFactory.predictors_map.keys(): + return PredictorFactory.predictors_map[name](**kwargs) + else: + raise NotImplementedError(f"Predictor with name {name} is not supported!") diff --git a/predict_model.py b/predict_model.py index afe57ba3..b9e0f93a 100644 --- a/predict_model.py +++ b/predict_model.py @@ -1,7 +1,7 @@ import argparse from tqdm import tqdm -from lmentry.predict import generate_all_hf_predictions +from lmentry.predict import PredictorFactory from tasks.task_utils import get_tasks_names, task_groups, all_tasks from lmentry.constants import DEFAULT_MAX_LENGTH @@ -15,6 +15,8 @@ def parse_args(): f"Task set names should be from the list: {task_groups.keys()}. " f"Task names should be from the list: {all_tasks}. " "It tries to analyze all tasks by default") + parser.add_argument("-p", "--predictor_type", type=str, default="hf", + help=f"Type of predictor, can be chosen from the list: {PredictorFactory.predictors_map.keys()}") parser.add_argument('-d', '--device', type=str, default="cuda", help="Device name. It is needed and used by mlc model only") parser.add_argument('-b', '--batch_size', type=int, default=100, @@ -24,9 +26,9 @@ def parse_args(): "If set 'None' or the value is bigger than all samples number - all samples will be chosen.") parser.add_argument('-ml', '--max_length', type=int, default=DEFAULT_MAX_LENGTH, help="Output max length") - parser.add_argument('--use_vllm', type=bool, default=False, - help="Whether to use vLLM inference. ") - parser.add_argument('-fp', '--force_predict', type=bool, default=False, + parser.add_argument('-uv', '--use_vllm', action='store_true', default=False, + help="Whether to use vLLM inference.") + parser.add_argument('-fp', '--force_predict', action="store_true", default=False, help="Whether to force regenerate predictions.") args = parser.parse_args() @@ -45,15 +47,20 @@ def main(): # TODO: Check that all other TODOs are done or marked [V] task_names = get_tasks_names(args.task_names) + # Init predictor + predictor = PredictorFactory.get_predictor( + name=args.predictor_type, + max_length=args.max_length, + batch_size=args.batch_size, + samples_num=args.samples_num, + ) + for model_name in tqdm(args.model_names, desc="Predict specified models"): print(f"Prediction of tasks for {model_name} model starts") - generate_all_hf_predictions( + predictor.generate( task_names=task_names, model_name=model_name, - max_length=args.max_length, - batch_size=args.batch_size, device=args.device, - samples_num=args.samples_num, use_vllm=args.use_vllm, force_predict=args.force_predict, ) diff --git a/tasks/task.py b/tasks/task.py index be867dd1..e2aab598 100644 --- a/tasks/task.py +++ b/tasks/task.py @@ -25,6 +25,19 @@ def __init__(self, name): def create_data(self): raise NotImplementedError + def is_predicted(self, model_name, samples_num): + output_file = self.predictions_dir.joinpath(model_name).with_suffix(".json") + + if output_file.exists(): + with open(output_file) as task_json: + task_config = json.load(task_json) + if bool(task_config): + # TODO(vvchernov): it is excess check or we need discuss scenario for it + if samples_num and samples_num <= len(task_config): + logging.info(f"Task {self.name} was skipped due to it was done before. ({len(task_config)} generated vs. {samples_num} requested)\nUse '--force_predict' to force predictions generation.") + return True + return False + def get_data(self, data_path = None): data_path = data_path or self.default_data_path with open(data_path) as f_examples: