Skip to content

Commit

Permalink
[Monitoring] Update auto scaling scripts to use metrics by PK API (#4355
Browse files Browse the repository at this point in the history
)
  • Loading branch information
gchhablani committed Apr 26, 2024
1 parent 6535fe9 commit be3c597
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 31 deletions.
15 changes: 8 additions & 7 deletions scripts/monitoring/auto_scale_ec2_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,17 @@ def start_instance(challenge, evalai_interface):
)


def start_or_stop_workers(challenge, challenge_metrics, evalai_interface):
def start_or_stop_workers(challenge, evalai_interface):
try:
challenge_metrics = evalai_interface.get_challenge_submission_metrics_by_pk(challenge["id"])
pending_submissions = get_pending_submission_count(challenge_metrics)
except Exception: # noqa: F841
except Exception as e: # noqa: F841
print(
"Unable to get the pending submissions for challenge ID: {}, Title: {}. Skipping.".format(
challenge["id"], challenge["title"]
)
)
print(e)
return

print("Pending Submissions: {}, Challenge PK: {}, Title: {}".format(pending_submissions, challenge["id"], challenge["title"]))
Expand All @@ -94,11 +96,11 @@ def start_or_stop_workers(challenge, challenge_metrics, evalai_interface):


# TODO: Factor in limits for the APIs
def start_or_stop_workers_for_challenges(response, metrics, evalai_interface):
def start_or_stop_workers_for_challenges(response, evalai_interface):
for challenge in response["results"]:
if challenge["uses_ec2_worker"]:
try:
start_or_stop_workers(challenge, metrics[str(challenge["id"])], evalai_interface)
start_or_stop_workers(challenge, evalai_interface)
except Exception as e:
print(e)

Expand All @@ -112,12 +114,11 @@ def create_evalai_interface(auth_token, evalai_endpoint):
def start_job():
evalai_interface = create_evalai_interface(auth_token, evalai_endpoint)
response = evalai_interface.get_challenges()
metrics = evalai_interface.get_challenges_submission_metrics()
start_or_stop_workers_for_challenges(response, metrics, evalai_interface)
start_or_stop_workers_for_challenges(response, evalai_interface)
next_page = response["next"]
while next_page is not None:
response = evalai_interface.make_request(next_page, "GET")
start_or_stop_workers_for_challenges(response, metrics, evalai_interface)
start_or_stop_workers_for_challenges(response, evalai_interface)
next_page = response["next"]


Expand Down
18 changes: 9 additions & 9 deletions scripts/monitoring/auto_scale_eks_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

# Env Variables
ENV = os.environ.get("ENV", "production")
AUTH_TOKEN = os.environ.get("AUTH_TOKEN")
STAFF_AUTH_TOKEN = os.environ.get("AUTH_TOKEN")
EVALAI_ENDPOINT = os.environ.get("API_HOST_URL", "https://eval.ai")

json_path = os.environ.get("JSON_PATH", "~/prod_eks_auth_tokens.json")
Expand Down Expand Up @@ -107,8 +107,7 @@ def stop_eks_worker(challenge, evalai_interface, aws_keys):
return response


def get_pending_submission_count_by_pk(metrics, challenge_pk):
challenge_metrics = metrics[str(challenge_pk)]
def get_pending_submission_count(challenge_metrics):
pending_submissions = 0
for status in ["running", "submitted", "queued", "resuming"]:
pending_submissions += challenge_metrics.get(status, 0)
Expand Down Expand Up @@ -151,15 +150,17 @@ def scale_up_workers(challenge, original_desired_size, pending_submissions, eval
)


def scale_up_or_down_workers(challenge, metrics, evalai_interface, aws_keys, scale_up_desired_size):
def scale_up_or_down_workers(challenge, evalai_interface, staff_evalai_interface, aws_keys, scale_up_desired_size):
try:
pending_submissions = get_pending_submission_count_by_pk(metrics, challenge["id"])
except Exception: # noqa: F841
challenge_metrics = staff_evalai_interface.get_challenge_submission_metrics_by_pk(challenge["id"])
pending_submissions = get_pending_submission_count(challenge_metrics)
except Exception as e: # noqa: F841
print(
"Unable to get the pending submissions for challenge ID: {}, Title: {}. Skipping.".format(
challenge["id"], challenge["title"]
)
)
print(e)
return

eks_client, cluster_name, nodegroup_name = get_eks_meta(
Expand Down Expand Up @@ -209,8 +210,7 @@ def scale_up_or_down_workers(challenge, metrics, evalai_interface, aws_keys, sca
def start_job():

# Get metrics
evalai_interface = create_evalai_interface(AUTH_TOKEN)
metrics = evalai_interface.get_challenges_submission_metrics()
staff_evalai_interface = create_evalai_interface(STAFF_AUTH_TOKEN)

for challenge_id, details in INCLUDED_CHALLENGE_PKS.items():
# Auth Token
Expand All @@ -237,7 +237,7 @@ def start_job():
), "Challenge ID: {}, Title: {} is either not docker-based or remote-evaluation. Skipping.".format(
challenge["id"], challenge["title"]
)
scale_up_or_down_workers(challenge, metrics, evalai_interface, aws_keys, scale_up_desired_size)
scale_up_or_down_workers(challenge, evalai_interface, staff_evalai_interface, aws_keys, scale_up_desired_size)
time.sleep(1)
except Exception as e:
print(e)
Expand Down
37 changes: 22 additions & 15 deletions scripts/monitoring/auto_scale_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,27 @@ def scale_up_or_down_workers(challenge, challenge_metrics):


# TODO: Factor in limits for the APIs
def scale_up_or_down_workers_for_challenges(response, metrics):
def scale_up_or_down_workers_for_challenge(challenge, challenge_metrics):
if ENV == "prod":
try:
if challenge["remote_evaluation"] is False:
scale_up_or_down_workers(challenge, challenge_metrics)
except Exception as e:
print(e)
else:
try:
scale_up_or_down_workers(challenge, challenge_metrics)
except Exception as e:
print(e)


def scale_up_or_down_workers_for_challenges(response, evalai_interface):
for challenge in response["results"]:
if ENV == "prod":
try:
if challenge["remote_evaluation"] is False:
scale_up_or_down_workers(challenge, metrics[str(challenge["id"])])
except Exception as e:
print(e)
else:
try:
scale_up_or_down_workers(challenge, metrics[str(challenge["id"])])
except Exception as e:
print(e)
try:
challenge_metrics = evalai_interface.get_challenge_submission_metrics_by_pk(challenge["id"])
scale_up_or_down_workers_for_challenge(challenge, challenge_metrics)
except Exception as e:
print(e)


def create_evalai_interface(auth_token, evalai_endpoint):
Expand All @@ -112,12 +120,11 @@ def create_evalai_interface(auth_token, evalai_endpoint):
def start_job():
evalai_interface = create_evalai_interface(auth_token, evalai_endpoint)
response = evalai_interface.get_challenges()
metrics = evalai_interface.get_challenges_submission_metrics()
scale_up_or_down_workers_for_challenges(response, metrics)
scale_up_or_down_workers_for_challenges(response, evalai_interface)
next_page = response["next"]
while next_page is not None:
response = evalai_interface.make_request(next_page, "GET")
scale_up_or_down_workers_for_challenges(response, metrics)
scale_up_or_down_workers_for_challenges(response, evalai_interface)
next_page = response["next"]


Expand Down
8 changes: 8 additions & 0 deletions scripts/monitoring/evalai_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"get_challenges": "/api/challenges/challenge/all/all/all",
"get_submissions_for_challenge": "/api/jobs/challenge/{}/submission/",
"get_challenges_submission_metrics": "/api/challenges/challenge/get_submission_metrics",
"get_challenge_submission_metrics_by_pk": "/api/challenges/challenge/get_submission_metrics_by_pk/{}/",
"manage_ec2_instance": "/api/challenges/{}/manage_ec2_instance/{}",
"get_ec2_instance_details": "/api/challenges/{}/get_ec2_instance_details/",
}
Expand Down Expand Up @@ -144,6 +145,13 @@ def get_challenges_submission_metrics(self):
response = self.make_request(url, "GET")
return response

def get_challenge_submission_metrics_by_pk(self, challenge_pk):
url_template = URLS.get("get_challenge_submission_metrics_by_pk")
url = url_template.format(challenge_pk)
url = self.return_url_per_environment(url)
response = self.make_request(url, "GET")
return response

def get_ec2_instance_details(self, challenge_pk):
url_template = URLS.get("get_ec2_instance_details")
url = url_template.format(challenge_pk)
Expand Down

0 comments on commit be3c597

Please sign in to comment.