-
Notifications
You must be signed in to change notification settings - Fork 175
Expand file tree
/
Copy pathkernel.py
More file actions
697 lines (632 loc) · 27.5 KB
/
kernel.py
File metadata and controls
697 lines (632 loc) · 27.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
from __future__ import annotations
import asyncio
import functools
import gzip
import io
import logging
import lzma
import os
import re
import shutil
import subprocess
import textwrap
from collections.abc import Mapping, MutableMapping
from importlib.resources import files
from pathlib import Path, PurePosixPath
from typing import Any, Final, cast, override
import aiohttp
import janus
from aiodocker.docker import Docker, DockerVolume
from aiodocker.exceptions import DockerError
from aiotools import TaskGroup
from ai.backend.agent.config.unified import AgentUnifiedConfig
from ai.backend.agent.docker.utils import PersistentServiceContainer
from ai.backend.agent.errors import KernelRunnerNotInitializedError, SubprocessStreamError
from ai.backend.agent.kernel import AbstractCodeRunner, AbstractKernel
from ai.backend.agent.resources import KernelResourceSpec
from ai.backend.agent.types import AgentEventData, KernelOwnershipData
from ai.backend.agent.utils import closing_async, get_arch_name
from ai.backend.common.asyncio import current_loop
from ai.backend.common.docker import ImageRef
from ai.backend.common.dto.agent.response import CodeCompletionResp
from ai.backend.common.events.dispatcher import EventProducer
from ai.backend.common.lock import FileLock
from ai.backend.common.types import CommitStatus, KernelId, Sentinel, SessionId
from ai.backend.logging import BraceStyleAdapter
from ai.backend.plugin.entrypoint import scan_entrypoints
log = BraceStyleAdapter(logging.getLogger(__spec__.name))
DEFAULT_CHUNK_SIZE: Final = 256 * 1024 # 256 KiB
DEFAULT_INFLIGHT_CHUNKS: Final = 8
class DockerKernel(AbstractKernel):
network_driver: str
def __init__(
self,
ownership_data: KernelOwnershipData,
network_id: str,
image: ImageRef,
version: int,
network_driver: str,
*,
agent_config: Mapping[str, Any],
resource_spec: KernelResourceSpec,
service_ports: Any, # TODO: type-annotation
environ: Mapping[str, Any],
data: dict[str, Any],
) -> None:
super().__init__(
ownership_data,
network_id,
image,
version,
agent_config=agent_config,
resource_spec=resource_spec,
service_ports=service_ports,
data=data,
environ=environ,
)
self.network_driver = network_driver
@override
async def close(self) -> None:
pass
def __getstate__(self) -> Mapping[str, Any]:
return super().__getstate__()
def __setstate__(self, props: MutableMapping[str, Any]) -> None:
if "network_driver" not in props:
props["network_driver"] = "bridge"
super().__setstate__(props)
@override
async def create_code_runner(
self, event_producer: EventProducer, *, client_features: frozenset[str], api_version: int
) -> AbstractCodeRunner:
return await DockerCodeRunner.new(
self.kernel_id,
self.session_id,
event_producer,
kernel_host="127.0.0.1", # repl ports are always bound to 127.0.0.1
repl_in_port=self.data["repl_in_port"],
repl_out_port=self.data["repl_out_port"],
exec_timeout=0,
client_features=client_features,
)
@override
async def get_completions(self, text: str, opts: Mapping[str, Any]) -> CodeCompletionResp:
if self.runner is None:
raise KernelRunnerNotInitializedError("Kernel runner is not initialized")
result = await self.runner.feed_and_get_completion(text, opts)
return CodeCompletionResp(result=result)
@override
async def check_status(self) -> dict[str, Any] | None:
if self.runner is None:
raise KernelRunnerNotInitializedError("Kernel runner is not initialized")
return await self.runner.feed_and_get_status()
@override
async def get_logs(self) -> dict[str, Any]:
container_id = self.data["container_id"]
async with closing_async(Docker()) as docker:
container = await docker.containers.get(container_id)
logs = await container.log(stdout=True, stderr=True, follow=False)
return {"logs": "".join(logs)}
@override
async def interrupt_kernel(self) -> dict[str, Any]:
if self.runner is None:
raise KernelRunnerNotInitializedError("Kernel runner is not initialized")
await self.runner.feed_interrupt()
return {"status": "finished"}
@override
async def start_service(self, service: str, opts: Mapping[str, Any]) -> dict[str, Any]:
if self.runner is None:
raise KernelRunnerNotInitializedError("Kernel runner is not initialized")
if self.data.get("block_service_ports", False):
return {
"status": "failed",
"error": "operation blocked",
}
for sport in self.service_ports:
if sport["name"] == service:
break
else:
return {"status": "failed", "error": "invalid service name"}
return await self.runner.feed_start_service({
"name": service,
"port": sport["container_ports"][0], # primary port
"ports": sport["container_ports"],
"protocol": sport["protocol"],
"options": opts,
})
@override
async def start_model_service(self, model_service: Mapping[str, Any]) -> dict[str, Any]:
if self.runner is None:
raise KernelRunnerNotInitializedError("Kernel runner is not initialized")
return await self.runner.feed_start_model_service(model_service)
@override
async def shutdown_service(self, service: str) -> None:
if self.runner is None:
raise KernelRunnerNotInitializedError("Kernel runner is not initialized")
await self.runner.feed_shutdown_service(service)
@override
async def get_service_apps(self) -> dict[str, Any]:
if self.runner is None:
raise KernelRunnerNotInitializedError("Kernel runner is not initialized")
return await self.runner.feed_service_apps()
def _get_commit_path(self, kernel_id: KernelId, subdir: str) -> tuple[Path, Path]:
base_commit_path: Path = self.agent_config["agent"]["image-commit-path"]
commit_path = base_commit_path / subdir
lock_path = commit_path / "lock" / str(kernel_id)
return commit_path, lock_path
@override
async def check_duplicate_commit(self, kernel_id: KernelId, subdir: str) -> CommitStatus:
_, lock_path = self._get_commit_path(kernel_id, subdir)
if lock_path.exists():
return CommitStatus.ONGOING
return CommitStatus.READY
@override
async def commit(
self,
kernel_id: KernelId,
subdir: str,
*,
canonical: str | None = None,
filename: str | None = None,
extra_labels: dict[str, str] | None = None,
) -> None:
if extra_labels is None:
extra_labels = {}
if self.runner is None:
raise KernelRunnerNotInitializedError("Kernel runner is not initialized")
loop = asyncio.get_running_loop()
path, lock_path = self._get_commit_path(kernel_id, subdir)
container_id: str = str(self.data["container_id"])
try:
Path(path).mkdir(exist_ok=True, parents=True)
Path(lock_path).parent.mkdir(exist_ok=True, parents=True)
except ValueError as e: # parent_path does not start with work_dir!
raise ValueError("malformed committed path.") from e
def _write_chunks(
fileobj: gzip.GzipFile,
q: janus._SyncQueueProxy[bytes | Sentinel],
) -> None:
while True:
chunk = q.get()
if chunk is Sentinel.TOKEN:
return
fileobj.write(chunk)
q.task_done()
try:
async with FileLock(path=lock_path, timeout=0.1, remove_when_unlock=True):
log.info("Container (k: {}) is being committed", kernel_id)
async with closing_async(Docker()) as docker:
# There is a known issue at certain versions of Docker Engine
# which prevents container from being committed when request config body is empty
# https://github.com/moby/moby/issues/45543
docker_info = await docker.system.info()
docker_version = docker_info["ServerVersion"]
major, _, patch = docker_version.split(".", maxsplit=2)
config = None
if (int(major) == 23 and int(patch) < 8) or (
int(major) == 24 and int(patch) < 1
):
config = {"ContainerSpec": {}}
container = docker.containers.container(container_id)
changes: list[str] = []
for label_name, label_value in extra_labels.items():
changes.append(f"LABEL {label_name}={label_value}")
if canonical:
if ":" in canonical:
repo, tag = canonical.rsplit(":", maxsplit=1)
else:
repo, tag = canonical, "latest"
log.debug("tagging image as {}:{}", repo, tag)
else:
repo, tag = None, None
# TODO:
# - After aiodocker supports commit() timeout, set timeout there
# - Impl Docker client wrapper
commit_timeout = aiohttp.ClientTimeout(
total=self.agent_config["api"]["commit-timeout"]
)
docker.session._timeout = commit_timeout
response: Mapping[str, Any] = await container.commit(
changes=changes or None,
repository=repo,
tag=tag,
config=config,
)
image_id = response["Id"]
if filename:
filepath = path / filename
try:
q: janus.Queue[bytes | Sentinel] = janus.Queue(
maxsize=DEFAULT_INFLIGHT_CHUNKS
)
async with docker._query(f"images/{image_id}/get") as tb_resp:
with gzip.open(filepath, "wb") as fileobj:
write_task = loop.run_in_executor(
None,
functools.partial(
_write_chunks,
fileobj,
q.sync_q,
),
)
try:
await asyncio.sleep(0) # let write_task get started
async for chunk in tb_resp.content.iter_chunked(
DEFAULT_CHUNK_SIZE
):
await q.async_q.put(chunk)
finally:
await q.async_q.put(Sentinel.TOKEN)
await write_task
finally:
await docker.images.delete(image_id)
except TimeoutError:
log.warning("Session is already being committed.")
@override
async def accept_file(self, container_path: os.PathLike[str] | str, filedata: bytes) -> None:
loop = current_loop()
host_work_dir: Path = (
self.agent_config["container"]["scratch-root"] / str(self.kernel_id) / "work"
)
host_abspath = (host_work_dir / container_path).resolve(strict=False)
if not host_abspath.is_relative_to(host_work_dir):
raise PermissionError("Not allowed to upload files outside /home/work")
def _write_to_disk() -> None:
host_abspath.parent.mkdir(parents=True, exist_ok=True)
host_abspath.write_bytes(filedata)
try:
await loop.run_in_executor(None, _write_to_disk)
except OSError as e:
raise RuntimeError(
f"{self.kernel_id}: writing uploaded file failed: {container_path} -> {host_abspath} ({e!r})"
) from e
@override
async def download_file(self, container_path: os.PathLike[str] | str) -> bytes:
container_id = self.data["container_id"]
container_home_path = PurePosixPath("/home/work")
container_abspath = PurePosixPath(os.path.normpath(container_home_path / container_path))
if not container_abspath.is_relative_to(container_home_path):
raise PermissionError("You cannot download files outside /home/work")
async with closing_async(Docker()) as docker:
container = docker.containers.container(container_id)
try:
with await container.get_archive(str(container_abspath)) as tarobj:
# FIXME: Replace this API call to a streaming version and cut the download if
# the downloaded size exceeds the limit.
if tarobj.fileobj is None:
raise SubprocessStreamError("Tar file object is not available")
tar_fobj = cast(io.BufferedIOBase, tarobj.fileobj)
tar_fobj.seek(0, io.SEEK_END)
tar_size = tar_fobj.tell()
if tar_size > 1048576:
raise ValueError("Too large archive file exceeding 1 MiB")
tar_fobj.seek(0, io.SEEK_SET)
tarbytes = tar_fobj.read()
except DockerError as e:
raise RuntimeError(f"Could not download the archive to: {container_abspath}") from e
return tarbytes
@override
async def download_single(self, container_path: os.PathLike[str] | str) -> bytes:
container_id = self.data["container_id"]
container_home_path = PurePosixPath("/home/work")
container_abspath = PurePosixPath(os.path.normpath(container_home_path / container_path))
if not container_abspath.is_relative_to(container_home_path):
raise PermissionError("You cannot download files outside /home/work")
async with closing_async(Docker()) as docker:
container = docker.containers.container(container_id)
try:
with await container.get_archive(str(container_abspath)) as tarobj:
# FIXME: Replace this API call to a streaming version and cut the download if
# the downloaded size exceeds the limit.
if tarobj.fileobj is None:
raise SubprocessStreamError("Tar file object is not available")
tar_fobj = cast(io.BufferedIOBase, tarobj.fileobj)
tar_fobj.seek(0, io.SEEK_END)
tar_size = tar_fobj.tell()
if tar_size > 1048576:
raise ValueError("Too large archive file exceeding 1 MiB")
tar_fobj.seek(0, io.SEEK_SET)
if len(tarobj.getnames()) > 1:
raise ValueError(
f"Expected a single-file archive but found multiple files from {container_abspath}"
)
inner_fname = tarobj.getnames()[0]
inner_fobj = tarobj.extractfile(inner_fname)
if not inner_fobj:
raise ValueError(
f"Could not read {inner_fname!r} the archive file {container_abspath}"
)
# FYI: To get the size of extracted file, seek and tell with inner_fobj.
content_bytes = inner_fobj.read()
except DockerError as e:
raise RuntimeError(f"Could not download the archive to: {container_abspath}") from e
return content_bytes
@override
async def list_files(self, container_path: os.PathLike[str] | str) -> dict[str, Any]:
container_id = self.data["container_id"]
# Confine the lookable paths in the home directory
container_home_path = PurePosixPath("/home/work")
container_abspath = PurePosixPath(os.path.normpath(container_home_path / container_path))
if not container_abspath.is_relative_to(container_home_path):
raise PermissionError("You cannot list files outside /home/work")
# Gather individual file information in the target path.
code = textwrap.dedent(
"""
import json
import os
import stat
import sys
files = []
for f in os.scandir(sys.argv[1]):
fstat = f.stat(follow_symlinks=False)
ctime = fstat.st_ctime # TODO: way to get concrete create time?
mtime = fstat.st_mtime
atime = fstat.st_atime
files.append({
'mode': stat.filemode(fstat.st_mode),
'size': fstat.st_size,
'ctime': ctime,
'mtime': mtime,
'atime': atime,
'filename': f.name,
})
print(json.dumps(files))
"""
)
proc = await asyncio.create_subprocess_exec(
*[
"docker",
"exec",
container_id,
"/opt/backend.ai/bin/python",
"-c",
code,
str(container_abspath),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
raw_out, raw_err = await proc.communicate()
out = raw_out.decode("utf-8")
err = raw_err.decode("utf-8")
return {"files": out, "errors": err, "abspath": str(container_path)}
@override
async def notify_event(self, evdata: AgentEventData) -> None:
if self.runner is None:
raise KernelRunnerNotInitializedError("Kernel runner is not initialized")
await self.runner.feed_event(evdata)
class DockerCodeRunner(AbstractCodeRunner):
kernel_host: str
repl_in_port: int
repl_out_port: int
def __init__(
self,
kernel_id: KernelId,
session_id: SessionId,
event_producer: EventProducer,
*,
kernel_host: str,
repl_in_port: int,
repl_out_port: int,
exec_timeout: int = 0,
client_features: frozenset[str] | None = None,
) -> None:
super().__init__(
kernel_id,
session_id,
event_producer,
exec_timeout=exec_timeout,
client_features=client_features,
)
self.kernel_host = kernel_host
self.repl_in_port = repl_in_port
self.repl_out_port = repl_out_port
@override
async def get_repl_in_addr(self) -> str:
return f"tcp://{self.kernel_host}:{self.repl_in_port}"
@override
async def get_repl_out_addr(self) -> str:
return f"tcp://{self.kernel_host}:{self.repl_out_port}"
async def prepare_krunner_env_impl(distro: str, entrypoint_name: str) -> tuple[str, str | None]:
docker = Docker()
arch = get_arch_name()
current_version = int(
Path(
str(
files(f"ai.backend.krunner.{entrypoint_name}").joinpath(
f"./krunner-version.{distro}.txt"
)
)
)
.read_text()
.strip()
)
volume_name = f"backendai-krunner.v{current_version}.{arch}.{distro}"
extractor_image = "backendai-krunner-extractor:latest"
try:
for item in await docker.images.list():
if item["RepoTags"] is None or len(item["RepoTags"]) == 0:
continue
if item["RepoTags"][0] == extractor_image:
break
else:
log.info("preparing the Docker image for krunner extractor...")
extractor_archive = str(
files("ai.backend.runner").joinpath(f"krunner-extractor.img.{arch}.tar.xz")
)
with lzma.open(extractor_archive, "rb") as reader:
image_tar = reader.read()
proc = await asyncio.create_subprocess_exec(
*["docker", "load"], stdin=asyncio.subprocess.PIPE
)
await proc.communicate(input=image_tar)
if proc.returncode != 0:
raise RuntimeError("loading krunner extractor image has failed!")
log.info("checking krunner-env for {}...", distro)
do_create = False
try:
vol = DockerVolume(docker, volume_name) # type: ignore[no-untyped-call]
await vol.show() # type: ignore[no-untyped-call]
# Instead of checking the version from txt files inside the volume,
# we check the version via the volume name and its existence.
# This is because:
# - to avoid overwriting of volumes in use.
# - the name comparison is quicker than reading actual files.
except DockerError as e:
if e.status == 404:
do_create = True
if do_create:
archive_path = Path(
str(
files(f"ai.backend.krunner.{entrypoint_name}").joinpath(
f"krunner-env.{distro}.{arch}.tar.xz"
)
)
).resolve()
if not archive_path.exists():
log.warning("krunner environment for {} ({}) is not supported!", distro, arch)
else:
log.info("populating {} volume version {}", volume_name, current_version)
await docker.volumes.create({ # type: ignore[no-untyped-call]
"Name": volume_name,
"Driver": "local",
})
extractor_path = Path(
str(files("ai.backend.runner").joinpath("krunner-extractor.sh"))
).resolve()
proc = await asyncio.create_subprocess_exec(*[
"docker",
"run",
"--rm",
"-i",
"-v",
f"{archive_path}:/root/archive.tar.xz",
"-v",
f"{extractor_path}:/root/krunner-extractor.sh",
"-v",
f"{volume_name}:/root/volume",
"-e",
f"KRUNNER_VERSION={current_version}",
extractor_image,
"/root/krunner-extractor.sh",
])
if await proc.wait() != 0:
raise RuntimeError("extracting krunner environment has failed!")
except Exception:
log.exception("unexpected error")
return distro, None
finally:
await docker.close()
return distro, volume_name
async def prepare_krunner_env(_local_config: Mapping[str, Any]) -> Mapping[str, str]:
"""
Check if the volume "backendai-krunner.{distro}.{arch}" exists and is up-to-date.
If not, automatically create it and update its content from the packaged pre-built krunner
tar archives.
"""
all_distros: list[tuple[str, str]] = []
entry_prefix = "backendai_krunner_v10"
for entrypoint in scan_entrypoints(entry_prefix):
log.debug("loading krunner pkg: {}", entrypoint.module)
plugin = entrypoint.load()
await plugin.init({}) # currently does nothing
provided_versions = (
Path(str(files(f"ai.backend.krunner.{entrypoint.name}").joinpath("versions.txt")))
.read_text()
.splitlines()
)
all_distros.extend((distro, entrypoint.name) for distro in provided_versions)
tasks = []
async with TaskGroup() as tg:
for distro, entrypoint_name in all_distros:
tasks.append(tg.create_task(prepare_krunner_env_impl(distro, entrypoint_name)))
distro_volumes = [t.result() for t in tasks if not t.cancelled()]
result = {}
for distro_name_and_version, volume_name in distro_volumes:
if volume_name is None:
continue
result[distro_name_and_version] = volume_name
return result
LinuxKit_IPTABLES_RULE = re.compile(
r"DNAT\s+tcp\s+\-\-\s+anywhere\s+169\.254\.169\.254\s+tcp dpt:http to:127\.0\.0\.1:50128"
)
LinuxKit_CMD_EXEC_PREFIX = [
"docker",
"run",
"--rm",
"-i",
"--privileged",
"--pid=host",
"linuxkit-nsenter:latest",
]
async def prepare_kernel_metadata_uri_handling(local_config: AgentUnifiedConfig) -> None:
if local_config.agent.docker_mode == "linuxkit":
# Docker Desktop mode
arch = get_arch_name()
proxy_worker_binary = str(
files("ai.backend.agent.docker").joinpath(f"linuxkit-metadata-proxy-worker.{arch}.bin")
)
proxy_path = Path("/tmp/backend.ai/linuxkit-metadata-proxy")
shutil.copyfile(proxy_worker_binary, proxy_path)
proxy_path.chmod(0o755)
server_port = local_config.agent.metadata_server_port
# Prepare proxy worker container
proxy_worker_container = PersistentServiceContainer(
"linuxkit-nsenter:latest",
{
"Cmd": [
"/bin/sh",
"-c",
(
"ctr -n services.linuxkit t kill --exec-id metaproxy docker;ctr -n"
" services.linuxkit t exec --exec-id metaproxy docker"
" /host_mnt/tmp/backend.ai/linuxkit-metadata-proxy -remote-port"
f" {server_port}"
),
],
"HostConfig": {
"PidMode": "host",
"Privileged": True,
},
},
name="linuxkit-nsenter",
)
await proxy_worker_container.ensure_running_latest()
# Check if iptables rule is propagated on LinuxKit VM properly
log.info("Checking metadata URL iptables rule ...")
proc = await asyncio.create_subprocess_exec(
*(LinuxKit_CMD_EXEC_PREFIX + ["/sbin/iptables", "-n", "-t", "nat", "-L", "PREROUTING"]),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
await proc.wait()
if proc.stdout is None:
raise SubprocessStreamError("Subprocess stdout is not available")
raw_rules = await proc.stdout.read()
rules = raw_rules.decode()
if LinuxKit_IPTABLES_RULE.search(rules) is None:
proc = await asyncio.create_subprocess_exec(
*(
LinuxKit_CMD_EXEC_PREFIX
+ [
"/sbin/iptables",
"-t",
"nat",
"-I",
"PREROUTING",
"-d",
"169.254.169.254",
"-p",
"tcp",
"--dport",
"80",
"-j",
"DNAT",
"--to-destination",
"127.0.0.1:50128",
]
)
)
await proc.wait()
log.info("Inserted the iptables rules.")
else:
log.info("The iptables rule already exists.")