Examples
Here are several examples to demonstrate the functional API usage.
Initialization of the API Client
Implicit configuration from environment variables
from ai.backend.client.session import Session
def main():
with Session() as api_session:
print(api_session.System.get_versions())
if __name__ == "__main__":
main()
See also
Explicit configuration
from ai.backend.client.config import APIConfig
from ai.backend.client.session import Session
def main():
config = APIConfig(
endpoint="https://api.backend.ai.local",
endpoint_type="api",
domain="default",
group="default", # the default project name to use
)
with Session(config=config) as api_session:
print(api_session.System.get_versions())
if __name__ == "__main__":
main()
See also
Asyncio-native API session
import asyncio
from ai.backend.client.session import AsyncSession
async def main():
async with AsyncSession() as api_session:
print(api_session.System.get_versions())
if __name__ == "__main__":
asyncio.run(main())
See also
The interface of API client session objects: ai.backend.client.session
Working with Compute Sessions
Note
From here, we omit the main()
function structure in the sample codes.
Listing currently running compute sessions
import functools
from ai.backend.client.session import Session
with Session() as api_session:
fetch_func = functools.partial(
api_session.ComputeSession.paginated_list,
status="RUNNING",
)
current_offset = 0
while True:
result = fetch_func(page_offset=current_offset, page_size=20)
if result.total_count == 0:
# no items found
break
current_offset += len(result.items)
for item in result.items:
print(item)
if current_offset >= result.total_count:
# end of list
break
Creating and destroying a compute session
from ai.backend.client.session import Session
with Session() as api_session:
my_session = api_session.ComputeSession.get_or_create(
"python:3.9-ubuntu20.04", # registered container image name
mounts=["mydata", "mymodel"], # vfolder names
resources={"cpu": 8, "mem": "32g", "cuda.device": 2},
)
print(my_session.id)
my_session.destroy()
Accessing Container Applications
Launchable apps may vary for sessions. From here we illustrate an example to create a ttyd (web-based terminal) app, which is available for all Backend.AI sessions.
Note
This example is only applicable for the Backend.AI cluster with AppProxy v2 enabled and configured. AppProxy v2 only ships with enterprise version of Backend.AI.
The ComputeSession.start_service()
API
import requests
from ai.backend.client.request import Request
from ai.backend.client.session import Session
app_name = "ttyd"
with Session() as api_session:
sess = api_session.ComputeSession.get_or_create(...)
service_info = sess.start_service(app_name, login_session_token="dummy")
app_proxy_url = f"{service_info['wsproxy_addr']}/v2/proxy/{service_info['token']}/{sess.id}/add?app={app_name}"
resp = requests.get(app_proxy_url)
body = resp.json()
auth_url = body["url"]
print(auth_url) # opening this link from browser will navigate user to the terminal session
Added in version 23.09.8: ai.backend.client.func.session.ComputeSession.start_service()
Set the value login_session_token
to a dummy string like "dummy"
as it is a trace of the legacy interface, which is no longer used.
Alternatively, in versions before 23.09.8, you may use the raw ai.backend.client.Request
to call the server-side start_service
API.
import asyncio
import aiohttp
from ai.backend.client.request import Request
from ai.backend.client.session import AsyncSession
app_name = "ttyd"
async def main():
async with AsyncSession() as api_session:
sess = api_session.ComputeSession.get_or_create(...)
rqst = Request(
"POST",
f"/session/{sess.id}/start-service",
)
rqst.set_json({"app": app_name, "login_session_token": "dummy"})
async with rqst.fetch() as resp:
body = await resp.json()
app_proxy_url = f"{body['wsproxy_addr']}/v2/proxy/{body['token']}/{sess.id}/add?app={app_name}"
async with aiohttp.ClientSession() as client:
async with client.get(app_proxy_url) as resp:
body = await resp.json()
auth_url = body["url"]
print(auth_url) # opening this link from browser will navigate user to the terminal session
if __name__ == "__main__":
asyncio.run(main())
Code Execution via API
Synchronous mode
Snippet execution (query mode)
This is the minimal code to execute a code snippet with this client SDK.
import sys
from ai.backend.client.session import Session
with Session() as api_session:
my_session = api_session.ComputeSession.get_or_create("python:3.9-ubuntu20.04")
code = 'print("hello world")'
mode = "query"
run_id = None
try:
while True:
result = my_session.execute(run_id, code, mode=mode)
run_id = result["runId"] # keeps track of this particular run loop
for rec in result.get("console", []):
if rec[0] == "stdout":
print(rec[1], end="", file=sys.stdout)
elif rec[0] == "stderr":
print(rec[1], end="", file=sys.stderr)
else:
handle_media(rec)
sys.stdout.flush()
if result["status"] == "finished":
break
else:
mode = "continued"
code = ""
finally:
my_session.destroy()
You need to take care of client_token
because it determines whether to
reuse kernel sessions or not.
Backend.AI cloud has a timeout so that it terminates long-idle kernel sessions,
but within the timeout, any kernel creation requests with the same client_token
let Backend.AI cloud to reuse the kernel.
Script execution (batch mode)
You first need to upload the files after creating the session and construct a
opts
struct.
import sys
from ai.backend.client.session import Session
with Session() as session:
compute_sess = session.ComputeSession.get_or_create("python:3.6-ubuntu18.04")
compute_sess.upload(["mycode.py", "setup.py"])
code = ""
mode = "batch"
run_id = None
opts = {
"build": "*", # calls "python setup.py install"
"exec": "python mycode.py arg1 arg2",
}
try:
while True:
result = kern.execute(run_id, code, mode=mode, opts=opts)
opts.clear()
run_id = result["runId"]
for rec in result.get("console", []):
if rec[0] == "stdout":
print(rec[1], end="", file=sys.stdout)
elif rec[0] == "stderr":
print(rec[1], end="", file=sys.stderr)
else:
handle_media(rec)
sys.stdout.flush()
if result["status"] == "finished":
break
else:
mode = "continued"
code = ""
finally:
compute_sess.destroy()
Handling user inputs
Inside the while-loop for kern.execute()
above,
change the if-block for result['status']
as follows:
...
if result["status"] == "finished":
break
elif result["status"] == "waiting-input":
mode = "input"
if result["options"].get("is_password", False):
code = getpass.getpass()
else:
code = input()
else:
mode = "continued"
code = ""
...
A common gotcha is to miss setting mode = "input"
. Be careful!
Handling multi-media outputs
The handle_media()
function used above examples would look like:
def handle_media(record):
media_type = record[0] # MIME-Type string
media_data = record[1] # content
...
The exact method to process media_data
depends on the media_type
.
Currently the following behaviors are well-defined:
For (binary-format) images, the content is a dataURI-encoded string.
For SVG (scalable vector graphics) images, the content is an XML string.
For
application/x-sorna-drawing
, the content is a JSON string that represents a set of vector drawing commands to be replayed the client-side (e.g., Javascript on browsers)
Asynchronous mode
The async version has all sync-version interfaces as coroutines but comes with additional
features such as stream_execute()
which streams the execution results via websockets and
stream_pty()
for interactive terminal streaming.
import asyncio
import json
import sys
import aiohttp
from ai.backend.client.session import AsyncSession
async def main():
async with AsyncSession() as api_session:
compute_sess = await api_session.ComputeSession.get_or_create(
"python:3.6-ubuntu18.04",
client_token="mysession",
)
code = 'print("hello world")'
mode = "query"
try:
async with compute_sess.stream_execute(code, mode=mode) as stream:
# no need for explicit run_id since WebSocket connection represents it!
async for result in stream:
if result.type != aiohttp.WSMsgType.TEXT:
continue
result = json.loads(result.data)
for rec in result.get("console", []):
if rec[0] == "stdout":
print(rec[1], end="", file=sys.stdout)
elif rec[0] == "stderr":
print(rec[1], end="", file=sys.stderr)
else:
handle_media(rec)
sys.stdout.flush()
if result["status"] == "finished":
break
elif result["status"] == "waiting-input":
mode = "input"
if result["options"].get("is_password", False):
code = getpass.getpass()
else:
code = input()
await stream.send_text(code)
else:
mode = "continued"
code = ""
finally:
await compute_sess.destroy()
if __name__ == "__main__":
asyncio.run(main())
Added in version 19.03.
Working with model service
Along with working AppProxy v2 deployments, model service requires a resource group configured to accept the inference workload.
Starting model service
from ai.backend.client.session import Session
with Session() as api_session:
compute_sess = api_session.Service.create(
"python:3.6-ubuntu18.04",
"Llama2-70B",
1,
service_name="Llama2-service",
resources={"cuda.shares": 2, "cpu": 8, "mem": "64g"},
open_to_public=False,
)
If you set open_to_public=True
, the endpoint accepts anonymous traffic without the authentication token (see below).
Making request to model service endpoint
from ai.backend.client.session import Session
with Session() as api_session:
compute_sess = api_session.Service.create(...)
service_info = compute_sess.info()
endpoint = service_info["url"] # this value can be None if no successful inference service deployment has been made
token_info = compute_sess.generate_api_token("3600s")
token = token_info["token"]
headers = {"Authorization": f"BackendAI {token}"} # providing token is not required for public model services
resp = requests.get(f"{endpoint}/v1/models", headers=headers)
The token returned by the generate_api_token()
method is a JSON web token (JWT), which conveys all required information to authenticate the inference request.
Once generated, it cannot be revoked. A token may have its own expiration date/time.
The lifetime of a token is configured by the user who deploys the inference model, and currently there is no intrinsic minimum/maximum limits of the lifetime.
Added in version 23.09.