Skip to content

Add support for async predictors #1813

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
3277023
run CI for this branch the same way as for main
technillogue Jan 16, 2024
a76a012
async runner (#1352)
technillogue Jan 16, 2024
b947c90
support async predict functions (#1350)
technillogue Oct 30, 2023
7e63a48
AsyncConcatenateIterator
technillogue May 7, 2024
40180a8
create event loop before predictor setup (#1366)
technillogue Jan 16, 2024
78879e4
minimal async worker (#1410)
technillogue Jan 22, 2024
7624116
Mux prediction events (#1405)
technillogue Feb 12, 2024
a616653
replace requests with httpx and factor out clients (#1574, #1707, #1714)
technillogue Mar 29, 2024
5904b60
implement mp.Connection with async streams (#1640)
technillogue May 7, 2024
ca14e0d
optimize webhook serialization and logging (#1651)
technillogue May 8, 2024
2be1715
tweak names and style
technillogue May 8, 2024
da1e997
omnibus actual concurrency and major refactor (#1530)
technillogue May 16, 2024
f77c205
function to emit metrics (#1649)
technillogue May 17, 2024
652074a
predict_time_share metric (#1643, #1683)
technillogue May 22, 2024
840cb3c
log traceback properly (#1734)
technillogue Jun 12, 2024
3875d00
add batch size metric (#1750)
technillogue Jun 18, 2024
403d52e
fix various lints
technillogue Jul 2, 2024
16a5811
Poison model healthcheck on shutdown
nickstenning Jul 3, 2024
0955c92
Propagate trace context to webhook and upload requests
aron Jul 4, 2024
66c7ed7
[async] Include prediction id upload request (#1788)
aron Jul 17, 2024
c856770
Passing tests
mattt Jul 18, 2024
52ba22f
Revert "run CI for this branch the same way as for main"
mattt Jul 18, 2024
7b69bc4
Implement webhook and file tests
mattt Jul 19, 2024
bd6bdf2
Add missing tests for data: URIs
mattt Jul 19, 2024
b215cea
Delete .tool-versions
mattt Jul 22, 2024
560b665
Add pylint tool settings to pyproject.toml
mattt Jul 22, 2024
59d0809
Ignore pylint warnings
mattt Jul 22, 2024
46c4980
Resolve pylint warnings
mattt Jul 22, 2024
a73266f
Remove duplicate and unused WorkerState declaration
mattt Jul 22, 2024
a9cf05d
Refactor get_weights_argument
mattt Jul 22, 2024
835b658
fix shutdown bugs (#1819, #1843)
technillogue Jul 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .tool-versions

This file was deleted.

14 changes: 14 additions & 0 deletions docs/metrics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Metrics

Prediction objects have a `metrics` field. This normally includes `predict_time` and `total_time`. Official language models have metrics like `input_token_count`, `output_token_count`, `tokens_per_second`, and `time_to_first_token`. Currently, custom metrics from Cog are ignored when running on Replicate. Official Replicate-published models are the only exception to this. When running outside of Replicate, you can emit custom metrics like this:


```python
import cog
from cog import BasePredictor, Path

class Predictor(BasePredictor):
def predict(self, width: int, height: int) -> Path:
"""Run a single prediction on the model"""
cog.emit_metric(name="pixel_count", value=width * height)
```
13 changes: 9 additions & 4 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,21 @@ type Build struct {
pythonRequirementsContent []string
}

type Concurrency struct {
Max int `json:"max,omitempty" yaml:"max"`
}

type Example struct {
Input map[string]string `json:"input" yaml:"input"`
Output string `json:"output" yaml:"output"`
}

type Config struct {
Build *Build `json:"build" yaml:"build"`
Image string `json:"image,omitempty" yaml:"image"`
Predict string `json:"predict,omitempty" yaml:"predict"`
Train string `json:"train,omitempty" yaml:"train"`
Build *Build `json:"build" yaml:"build"`
Image string `json:"image,omitempty" yaml:"image"`
Predict string `json:"predict,omitempty" yaml:"predict"`
Train string `json:"train,omitempty" yaml:"train"`
Concurrency *Concurrency `json:"concurrency,omitempty" yaml:"concurrency"`
}

func DefaultConfig() *Config {
Expand Down
5 changes: 0 additions & 5 deletions pkg/config/data/config_schema_v1.0.json
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,6 @@
"$id": "#/properties/concurrency/properties/max",
"type": "integer",
"description": "The maximum number of concurrent predictions."
},
"default_target": {
"$id": "#/properties/concurrency/properties/default_target",
"type": "integer",
"description": "The default target for number of concurrent predictions. This setting can be used by an autoscaler to determine when to scale a deployment of a model up or down."
}
}
}
Expand Down
22 changes: 20 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ authors = [{ name = "Replicate", email = "[email protected]" }]
license.file = "LICENSE"
urls."Source" = "https://github.com/replicate/cog"

requires-python = ">=3.7"
requires-python = ">=3.8"
dependencies = [
# intentionally loose. perhaps these should be vendored to not collide with user code?
"attrs>=20.1,<24",
"fastapi>=0.75.2,<0.99.0",
# we may not need http2
"httpx[http2]>=0.21.0,<1",
"pydantic>=1.9,<2",
"PyYAML",
"requests>=2,<3",
Expand All @@ -27,14 +29,15 @@ dependencies = [
optional-dependencies = { "dev" = [
"black",
"build",
"httpx",
'hypothesis<6.80.0; python_version < "3.8"',
'hypothesis; python_version >= "3.8"',
"respx",
'numpy<1.22.0; python_version < "3.8"',
'numpy; python_version >= "3.8"',
"pillow",
"pyright==1.1.347",
"pytest",
"pytest-asyncio",
"pytest-httpserver",
"pytest-rerunfailures",
"pytest-xdist",
Expand Down Expand Up @@ -66,6 +69,21 @@ reportUnusedExpression = "warning"
[tool.setuptools]
package-dir = { "" = "python" }

[tool.pylint.main]
disable = [
"C0114", # Missing module docstring
"C0115", # Missing class docstring
"C0116", # Missing function or method docstring
"C0301", # Line too long
"C0413", # Import should be placed at the top of the module
"R0903", # Too few public methods
"W0622", # Redefining built-in
]
good-names = ["id", "input"]

ignore-paths = ["python/cog/_version.py", "python/tests"]


[tool.ruff]
lint.select = [
"E", # pycodestyle error
Expand Down
12 changes: 11 additions & 1 deletion python/cog/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
from pydantic import BaseModel

from .predictor import BasePredictor
from .types import ConcatenateIterator, File, Input, Path, Secret
from .server.worker import emit_metric
from .types import (
AsyncConcatenateIterator,
ConcatenateIterator,
File,
Input,
Path,
Secret,
)

try:
from ._version import __version__
Expand All @@ -14,8 +22,10 @@
"BaseModel",
"BasePredictor",
"ConcatenateIterator",
"AsyncConcatenateIterator",
"File",
"Input",
"Path",
"Secret",
"emit_metric",
]
24 changes: 14 additions & 10 deletions python/cog/code_xforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def load_module_from_string(
if not source or not name:
return None
module = types.ModuleType(name)
exec(source, module.__dict__) # noqa: S102
exec(source, module.__dict__) # noqa: S102 # pylint: disable=exec-used
return module


Expand All @@ -32,7 +32,7 @@ class ClassExtractor(ast.NodeVisitor):
def __init__(self) -> None:
self.class_source = None

def visit_ClassDef(self, node: ast.ClassDef) -> None:
def visit_ClassDef(self, node: ast.ClassDef) -> None: # pylint: disable=invalid-name
if node.name in all_class_names:
self.class_source = ast.get_source_segment(source_code, node)

Expand All @@ -56,7 +56,7 @@ class FunctionExtractor(ast.NodeVisitor):
def __init__(self) -> None:
self.function_source = None

def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
def visit_FunctionDef(self, node: ast.FunctionDef) -> None: # pylint: disable=invalid-name
if node.name == function_name and not isinstance(node, ast.Module):
# Extract the source segment for this function definition
self.function_source = ast.get_source_segment(source_code, node)
Expand All @@ -79,14 +79,16 @@ def make_class_methods_empty(source_code: Union[str, ast.AST], class_name: str)
"""

class MethodBodyTransformer(ast.NodeTransformer):
def visit_ClassDef(self, node: ast.ClassDef) -> Optional[ast.AST]:
def visit_ClassDef(self, node: ast.ClassDef) -> Optional[ast.AST]: # pylint: disable=invalid-name
if node.name == class_name:
for body_item in node.body:
if isinstance(body_item, ast.FunctionDef):
# Replace the body of the method with `return None`
body_item.body = [ast.Return(value=ast.Constant(value=None))]
return node

return None

tree = source_code if isinstance(source_code, ast.AST) else ast.parse(source_code)
transformer = MethodBodyTransformer()
transformed_tree = transformer.visit(tree)
Expand All @@ -111,11 +113,11 @@ class MethodReturnTypeExtractor(ast.NodeVisitor):
def __init__(self) -> None:
self.return_type = None

def visit_ClassDef(self, node: ast.ClassDef) -> None:
def visit_ClassDef(self, node: ast.ClassDef) -> None: # pylint: disable=invalid-name
if node.name == class_name:
self.generic_visit(node)

def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
def visit_FunctionDef(self, node: ast.FunctionDef) -> None: # pylint: disable=invalid-name
if node.name == method_name and node.returns:
self.return_type = ast.unparse(node.returns)

Expand All @@ -142,7 +144,7 @@ class FunctionReturnTypeExtractor(ast.NodeVisitor):
def __init__(self) -> None:
self.return_type = None

def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
def visit_FunctionDef(self, node: ast.FunctionDef) -> None: # pylint: disable=invalid-name
if node.name == function_name and node.returns:
# Extract and return the string representation of the return type
self.return_type = ast.unparse(node.returns)
Expand All @@ -166,12 +168,14 @@ def make_function_empty(source_code: Union[str, ast.AST], function_name: str) ->
"""

class FunctionBodyTransformer(ast.NodeTransformer):
def visit_FunctionDef(self, node: ast.FunctionDef) -> Optional[ast.AST]:
def visit_FunctionDef(self, node: ast.FunctionDef) -> Optional[ast.AST]: # pylint: disable=invalid-name
if node.name == function_name:
# Replace the body of the function with `return None`
node.body = [ast.Return(value=ast.Constant(value=None))]
return node

return None

tree = source_code if isinstance(source_code, ast.AST) else ast.parse(source_code)
transformer = FunctionBodyTransformer()
transformed_tree = transformer.visit(tree)
Expand All @@ -195,12 +199,12 @@ class ImportExtractor(ast.NodeVisitor):
def __init__(self) -> None:
self.imports = []

def visit_Import(self, node: ast.Import) -> None:
def visit_Import(self, node: ast.Import) -> None: # pylint: disable=invalid-name
for alias in node.names:
if alias.name in module_names:
self.imports.append(ast.unparse(node))

def visit_ImportFrom(self, node: ast.ImportFrom) -> None:
def visit_ImportFrom(self, node: ast.ImportFrom) -> None: # pylint: disable=invalid-name
if node.module in module_names:
self.imports.append(ast.unparse(node))

Expand Down
37 changes: 27 additions & 10 deletions python/cog/command/ast_openapi_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,24 @@
"summary": "Healthcheck"
}
},
"/ready": {
"get": {
"summary": "Ready",
"operationId": "ready_ready_get",
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": {
"title": "Response Ready Ready Get"
}
}
}
}
}
}
},
"/predictions": {
"post": {
"description": "Run a single prediction on the model",
Expand Down Expand Up @@ -324,13 +342,12 @@ def find(obj: ast.AST, name: str) -> ast.AST:
def to_serializable(val: "AstVal") -> "JSONObject":
if isinstance(val, bytes):
return val.decode("utf-8")
elif isinstance(val, list):
if isinstance(val, list):
return [to_serializable(x) for x in val]
elif isinstance(val, complex):
if isinstance(val, complex):
msg = "complex inputs are not supported"
raise ValueError(msg)
else:
return val
return val


def get_value(node: ast.AST) -> "AstVal":
Expand Down Expand Up @@ -372,7 +389,7 @@ def get_call_name(call: ast.Call) -> str:
def parse_args(tree: ast.AST) -> "list[tuple[ast.arg, ast.expr | types.EllipsisType]]":
"""Parse argument, default pairs from a file with a predict function"""
predict = find(tree, "predict")
assert isinstance(predict, ast.FunctionDef)
assert isinstance(predict, (ast.FunctionDef, ast.AsyncFunctionDef))
args = predict.args.args # [-len(defaults) :]
# use Ellipsis instead of None here to distinguish a default of None
defaults = [...] * (len(args) - len(predict.args.defaults)) + predict.args.defaults
Expand Down Expand Up @@ -449,7 +466,7 @@ def parse_return_annotation(
tree: ast.AST, fn: str = "predict"
) -> "tuple[JSONDict, JSONDict]":
predict = find(tree, fn)
if not isinstance(predict, ast.FunctionDef):
if not isinstance(predict, (ast.FunctionDef, ast.AsyncFunctionDef)):
raise ValueError("Could not find predict function")
annotation = predict.returns
if not annotation:
Expand All @@ -472,8 +489,8 @@ def predict(
name = resolve_name(annotation)
if isinstance(annotation, ast.Subscript):
# forget about other subscripts like Optional, and assume otherlib.File will still be an uri
slice = resolve_name(annotation.slice)
format = {"format": "uri"} if slice in ("Path", "File") else {}
slice = resolve_name(annotation.slice) # pylint: disable=redefined-builtin
format = {"format": "uri"} if slice in ("Path", "File") else {} # pylint: disable=redefined-builtin
array_type = {"x-cog-array-type": "iterator"} if "Iterator" in name else {}
display_type = (
{"x-cog-array-display": "concatenate"} if "Concatenate" in name else {}
Expand Down Expand Up @@ -503,7 +520,7 @@ def predict(
KEPT_ATTRS = ("description", "default", "ge", "le", "max_length", "min_length", "regex")


def extract_info(code: str) -> "JSONDict":
def extract_info(code: str) -> "JSONDict": # pylint: disable=too-many-branches,too-many-locals
"""Parse the schemas from a file with a predict function"""
tree = ast.parse(code)
properties: JSONDict = {}
Expand All @@ -526,7 +543,7 @@ def extract_info(code: str) -> "JSONDict":
kws = {}
else:
raise ValueError("Unexpected default value", default)
input: JSONDict = {"x-order": len(properties)}
input: JSONDict = {"x-order": len(properties)} # pylint: disable=redefined-builtin
# need to handle other types?
arg_type = OPENAPI_TYPES.get(get_annotation(arg.annotation), "string")
if get_annotation(arg.annotation) in ("Path", "File"):
Expand Down
Loading