Unverified Commit a689e619 authored by yjjinjie's avatar yjjinjie Committed by GitHub
Browse files

[DLC] update the failed job and dlcclient request and user flow control (#5003)

parent b1a532ae
......@@ -7,6 +7,7 @@ import os
import pathlib
import sys
import traceback
import time
from argparse import ArgumentParser
# ref: https://help.aliyun.com/document_detail/203290.html?spm=a2c4g.11186623.6.727.6f9b5db6bzJh4x
from alibabacloud_pai_dlc20201203.client import Client
......@@ -83,18 +84,42 @@ if __name__ == "__main__":
response = client.create_job(req)
job_id = response.body.job_id
print('job id: ' + job_id)
print('job_id:' + job_id)
while True:
line = sys.stdin.readline().rstrip()
if line == 'update_status':
print('status:' + client.get_job(job_id).body.status)
# when the dlc sudden failure,such as 503,
# we will not get the status
# We'll keep getting the state until we get it
while True:
try:
# to avoid user flow control
time.sleep(60)
status = client.get_job(job_id).body.status
logging.info('job_id %s, client.get_job(job_id).body.status %s',job_id, status)
print('status:' + status)
break
except Exception as e:
logging.exception('dlc get status error: \n')
logging.info("exit job_id %s update status",job_id)
elif line == 'tracking_url':
#TODO: 1. get this url by api? 2. change this url in private dlc mode.
print('tracking_url:' + f'https://pai-dlc.console.aliyun.com/#/jobs/detail?jobId={job_id}&regionId={args.region}')
elif line == 'stop':
# when the dlc 503,we will not stop the job
# We'll keep stopping the job until we stop it
while True:
try:
# to avoid user flow control
time.sleep(60)
client.stop_job(job_id)
exit(0)
except Exception as e:
logging.error('DLC submit Exception: \n')
logging.error(e, exc_info=1)
logging.exception('dlc stop error: \n')
except Exception as e:
logging.exception('DLC submit Exception: \n')
......@@ -26,6 +26,7 @@ export class DlcClient {
// dlcUtil exception log dir
public logDir: string;
public pythonShellClient: undefined | PythonShell;
public status: string;
constructor(
type: string,
......@@ -65,6 +66,7 @@ export class DlcClient {
this.environmentId = environmentId;
this.userCommand = userCommand;
this.logDir = logDir;
this.status = '';
}
public submit(): Promise<string> {
......@@ -91,18 +93,40 @@ export class DlcClient {
]
});
this.log.debug(this.pythonShellClient.command);
this.pythonShellClient.on('message', function (envId: any) {
// received a message sent from the Python script (a simple "print" statement)
deferred.resolve(envId);
});
this.onMessage();
this.log.debug(`on message`);
this.monitorError(this.pythonShellClient, deferred);
this.log.debug(`monitor submit`);
const log = this.log;
this.pythonShellClient.on('message', (message: any) => {
const jobid = this.parseContent('job_id', message);
if (jobid !== '') {
log.debug(`reslove job_id ${jobid}`);
deferred.resolve(jobid);
}
});
return deferred.promise;
}
private onMessage(): void {
if (this.pythonShellClient === undefined) {
throw Error('python shell client not initialized!');
}
const log = this.log;
this.pythonShellClient.on('message', (message: any) => {
const status: string= this.parseContent('status', message);
if (status.length > 0) {
log.debug(`on message status: ${status}`)
this.status = status;
return;
}
});
}
public stop(): void {
if (this.pythonShellClient === undefined) {
this.log.debug(`python shell client not initialized!`);
throw Error('python shell client not initialized!');
}
this.log.debug(`send stop`);
this.pythonShellClient.send('stop');
}
......@@ -111,14 +135,17 @@ export class DlcClient {
if (this.pythonShellClient === undefined) {
throw Error('python shell client not initialized!');
}
this.log.debug(`send tracking_url`);
this.pythonShellClient.send('tracking_url');
const log = this.log;
this.pythonShellClient.on('message', (status: any) => {
const trackingUrl = this.parseContent('tracking_url', status);
if (trackingUrl !== '') {
log.debug(`trackingUrl:${trackingUrl}`);
deferred.resolve(trackingUrl);
}
});
this.monitorError(this.pythonShellClient, deferred);
return deferred.promise;
}
......@@ -128,47 +155,19 @@ export class DlcClient {
throw Error('python shell client not initialized!');
}
this.pythonShellClient.send('update_status');
this.pythonShellClient.on('message', (status: any) => {
let newStatus = this.parseContent('status', status);
if (newStatus === '') {
newStatus = oldStatus;
}
deferred.resolve(newStatus);
});
this.monitorError(this.pythonShellClient, deferred);
return deferred.promise;
}
public sendCommand(message: string): void {
if (this.pythonShellClient === undefined) {
throw Error('python shell client not initialized!');
}
this.log.debug(`command:${message}`);
this.pythonShellClient.send(`command:${message}`);
}
public receiveCommand(): Promise<string> {
const deferred: Deferred<string> = new Deferred<string>();
if (this.pythonShellClient === undefined) {
throw Error('python shell client not initialized!');
}
this.pythonShellClient.send('receive');
this.pythonShellClient.on('message', (command: any) => {
const message = this.parseContent('receive', command);
if (message !== '') {
deferred.resolve(JSON.parse(message))
if (this.status === '') {
this.status = oldStatus;
}
});
this.monitorError(this.pythonShellClient, deferred);
this.log.debug(`update_status:${this.status}`);
deferred.resolve(this.status);
return deferred.promise;
}
// Monitor error information in dlc python shell client
private monitorError(pythonShellClient: PythonShell, deferred: Deferred<any>): void {
const log = this.log;
pythonShellClient.on('error', function (error: any) {
deferred.reject(error);
});
pythonShellClient.on('close', function (error: any) {
log.info(`error:${error}`);
deferred.reject(error);
});
}
......
......@@ -4,6 +4,7 @@
import fs from 'fs';
import path from 'path';
import * as component from 'common/component';
import { Deferred } from 'ts-deferred';
import { getLogger, Logger } from 'common/log';
import { DlcConfig } from 'common/experimentConfig';
import { ExperimentStartupInfo } from 'common/experimentStartupInfo';
......@@ -16,6 +17,7 @@ import { MountedStorageService } from '../storages/mountedStorageService';
import { Scope } from 'typescript-ioc';
import { StorageService } from '../storageService';
import { getLogDir } from 'common/utils';
import { setTimeout } from 'timers/promises';
/**
* Collector DLC jobs info from DLC cluster, and update dlc job status locally
......@@ -55,6 +57,7 @@ export class DlcEnvironmentService extends EnvironmentService {
}
public async refreshEnvironmentsStatus(environments: EnvironmentInformation[]): Promise<void> {
const deferred: Deferred<void> = new Deferred<void>();
environments.forEach(async (environment) => {
const dlcClient = (environment as DlcEnvironmentInformation).dlcClient;
if (!dlcClient) {
......@@ -76,8 +79,11 @@ export class DlcEnvironmentService extends EnvironmentService {
environment.setStatus('SUCCEEDED');
break;
case 'FAILED':
// the job create failed,we will sleep(60) to create new job
await setTimeout(60000);
this.log.debug(`await 60s to create new job,DLC: job ${environment.id} is failed!`);
environment.setStatus('FAILED');
return Promise.reject(`DLC: job ${environment.envId} is failed!`);
break;
case 'STOPPED':
case 'STOPPING':
environment.setStatus('USER_CANCELED');
......@@ -86,6 +92,8 @@ export class DlcEnvironmentService extends EnvironmentService {
environment.setStatus('UNKNOWN');
}
});
deferred.resolve();
return deferred.promise;
}
public async startEnvironment(environment: EnvironmentInformation): Promise<void> {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment