batch.py 8.24 KB
Newer Older
yuhai's avatar
yuhai committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import os,sys,time
from itertools import zip_longest
from .job_status import JobStatus


class Batch(object) :
    def __init__ (self,
                  context, 
                  uuid_names = True) :
        self.context = context
        self.uuid_names = uuid_names
        if uuid_names:
            self.upload_tag_name = '%s_tag_upload' % self.context.job_uuid
            self.finish_tag_name = '%s_tag_finished' % self.context.job_uuid
            self.sub_script_name = '%s.sub' % self.context.job_uuid
            self.job_id_name = '%s_job_id' % self.context.job_uuid
        else:
            self.upload_tag_name = 'tag_upload'
            self.finish_tag_name = 'tag_finished'
            self.sub_script_name = 'run.sub'
            self.job_id_name = 'job_id'

    def check_status(self) :
        raise NotImplementedError('abstract method check_status should be implemented by derived class')        
        
    def default_resources(self, res) :
        raise NotImplementedError('abstract method sub_script_head should be implemented by derived class')        

    def sub_script_head(self, res) :
        raise NotImplementedError('abstract method sub_script_head should be implemented by derived class')        

    def sub_script_cmd(self, cmd, arg, res):
        raise NotImplementedError('abstract method sub_script_cmd should be implemented by derived class')        

    def exec_sub_script(self, script_str):
        raise NotImplementedError('abstract method exec_sub_script should be implemented by derived class')

    def check_before_sub(self, res):
        pass

    def sub_step_head(self, step_res=None):
        return ""

    def do_submit(self,
                  job_dirs,
                  cmds,
                  args = None, 
                  res = None,
                  outlog = 'log',
                  errlog = 'err',
                  para_deg = 1,
                  para_res = None):
        '''
        submit a single job, assuming that no job is running there.
        '''
        if res is None:
            res = self.default_resources(res)
        self.check_before_sub(res)
        script_str = self.sub_script(job_dirs, cmds, args=args, res=res, 
                                     outlog=outlog, errlog=errlog,
                                     para_deg=para_deg, para_res=para_res)
        self.exec_sub_script(script_str=script_str)

    def sub_script(self,
                   job_dirs,
                   cmds,
                   args = None,
                   res  = None,
                   outlog = 'log',
                   errlog = 'err',
                   para_deg = 1,
                   para_res = None) :
        """
        make submit script

        job_dirs(list):         directories of jobs. size: n_job
        cmds(list of list):     commands to be executed in each dir. size: n_job x n_cmd
        args(list of list):     args of commands. size: n_job x n_cmd
                                can be None
        res(dict):              resources available
        outlog(str):            file name for output
        errlog(str):            file name for error
        """
        res = self.default_resources(res)
        ret = self.sub_script_head(res)
        if not isinstance(job_dirs, list):
            job_dirs = [job_dirs]
        if not isinstance(cmds, list):
            cmds = [cmds]
        if not isinstance(cmds[0], list):
            cmds = [cmds for d in job_dirs]
        if args is None:
            args = [['' for c in jcmd] for jcmd in cmds]
        if not isinstance(para_res, list):
            para_res = [para_res for d in job_dirs]
        # loop over cmds 
        for jj, (jcmds, jargs) in enumerate(zip(zip_longest(*cmds), zip_longest(*args))):
            # for one cmd per dir
            ret += self._sub_script_inner(job_dirs,
                                          jcmds,
                                          jargs,
                                          res,
                                          jj,
                                          outlog=outlog,
                                          errlog=errlog,
                                          para_deg=para_deg,
                                          para_res=para_res)
        ret += '\nwait\n'
        ret += '\ntouch %s\n' % self.finish_tag_name
        return ret

    def submit(self,
               job_dirs,
               cmds,
               args = None,
               res = None,
               restart = False,
               sleep = 0,
               outlog = 'log',
               errlog = 'err',
               para_deg = 1,
               para_res = None):
        if restart:
            status = self.check_status()
            if status in [  JobStatus.unsubmitted, JobStatus.unknown, JobStatus.terminated ]:
                # dlog.debug('task restart point !!!')
                self.do_submit(job_dirs, cmds, args, res, outlog=outlog, errlog=errlog)
            elif status==JobStatus.waiting:
                pass
                # dlog.debug('task is waiting')
            elif status==JobStatus.running:
                pass
                # dlog.debug('task is running')
            elif status==JobStatus.finished:
                pass
                # dlog.debug('task is finished')
            else:
                raise RuntimeError('unknow job status, must be wrong')
        else:
            # dlog.debug('new task')
            self.do_submit(job_dirs, cmds, args, res, 
                           outlog=outlog, errlog=errlog, 
                           para_deg=para_deg, para_res=para_res)
        if res is not None:
            sleep = res.get('submit_wait_time', sleep)
        time.sleep(sleep) # For preventing the crash of the tasks while submitting        

    def check_finish_tag(self) :
        return self.context.check_file_exists(self.finish_tag_name)

    def _sub_script_inner(self, 
                          job_dirs,
                          cmds,
                          args,
                          res,
                          step = 0,
                          outlog = 'log',
                          errlog = 'err',
                          para_deg = 1,
                          para_res = None) :
        # job_dirs: a list of dirs
        # cmds: a list of cmds, `cmds[i]` will be run in directory `job_dirs[i]`
        # args: a list of args, `args[i]` will be passed to `cmd[i]` in `job_dirs[i]`
        # res: common resources to be used
        # para_res: a list of resources for each cmd, used to make sub-steps
        try:
            allow_failure = res['allow_failure']
        except:
            allow_failure = False
        ret = ""
        # additional checker for ingroup parallel
        if para_deg > 1:
            ret += 'pids=""; FAIL=0\n\n'
        # iter over job dirs
        for idx, (idir, icmd, iarg) in enumerate(zip(job_dirs, cmds, args)) :
            ret += 'cd %s\n' % idir
            ret += 'test $? -ne 0 && exit\n'

            # check if finished
            sub = "\n"
            sub += 'if [ ! -f tag_%d_finished ] ;then\n' % step
            # build command
            tmp_cmd = self.sub_script_cmd(icmd, iarg, res)
            if para_deg > 1 and not res.get("with_mpi", False) and para_res:
                tmp_cmd = self.sub_step_head(para_res[idx]) + tmp_cmd
            sub += '  %s 1>> %s 2>> %s \n' % (tmp_cmd, outlog, errlog)
            # check failure
            if not allow_failure:
                sub += '  if test $? -ne 0; then exit 1; else touch tag_%d_finished; fi \n' % step
            else :
                sub += '  if test $? -ne 0; then touch tag_failure_%d; fi \n' % step
                sub += '  touch tag_%d_finished \n' % step
            sub += 'fi\n'

            # if parallel put step into subshell
            if para_deg > 1:
                sub = f'\n({sub})&\n'
                sub += 'pids+=" $!"\n'
            sub += "\n"

            ret += sub
            ret += 'cd %s\n' % self.context.remote_root
            ret += 'test $? -ne 0 && exit\n'
            if para_deg > 1 and ((idx+1) % para_deg == 0 or idx + 1 == len(job_dirs)):
                ret += '\n\nfor p in $pids; do wait $p || let "FAIL+=1"; done\n'
                ret += 'test $FAIL -ne 0 && exit\n'
                ret += 'pids=""; FAIL=0\n'
            ret += "\n\n"
        
        return ret