task.py 9.16 KB
Newer Older
yuhai's avatar
yuhai committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
import os
from pathlib import Path
from deepks.utils import link_file, copy_file, create_dir
from deepks.utils import check_list
from deepks.utils import get_abs_path

import sys
import subprocess as sp
from copy import deepcopy
from contextlib import nullcontext, redirect_stdout, redirect_stderr
from deepks.task.job.dispatcher import Dispatcher


__all__ = ["BlankTask", "PythonTask", "ShellTask", "BatchTask", "GroupBatchTask"]


class AbstructStep(object):
    def __init__(self, workdir):
        self.workdir = Path(workdir)
        
    def __repr__(self):
        return f'<{type(self).__module__}.{type(self).__name__} with workdir: {self.workdir}>'
        
    def run(self, *args, **kwargs):
        raise NotImplementedError
    
    def prepend_workdir(self, path):
        self.workdir = path / self.workdir
        
    def append_workdir(self, path):
        self.workdir = self.workdir / path


class AbstructTask(AbstructStep):
    def __init__(self, workdir='.', backup=False, prev_task=None,
                 prev_folder=None, link_prev_files=None, copy_prev_files=None, 
                 share_folder=None, link_share_files=None, copy_share_files=None,
                 link_abs_files=None, copy_abs_files=None):
        # workdir has to be relative in order to be chained
        # prev_task is dereferenced to folder dynamically.
        # folders are absolute.
        super().__init__(workdir)
        self.backup = backup
        assert prev_task is None or prev_folder is None
        self.prev_task = prev_task
        self.prev_folder = get_abs_path(prev_folder)
        self.share_folder = get_abs_path(share_folder)
        self.link_prev_files = check_list(link_prev_files)
        self.copy_prev_files = check_list(copy_prev_files)
        self.link_share_files = check_list(link_share_files)
        self.copy_share_files = check_list(copy_share_files)
        self.link_abs_files = check_list(link_abs_files)
        self.copy_abs_files = check_list(copy_abs_files)
        
    def preprocess(self):
        create_dir(self.workdir, self.backup)
        if self.prev_folder is None and (self.link_prev_files or self.copy_prev_files):
            self.prev_folder = self.prev_task.workdir
        for f in self.link_prev_files:
            (fsrc, fdst) = (f, f) if isinstance(f, str) else f
            link_file(self.prev_folder / fsrc, self.workdir / fdst)
        for f in self.copy_prev_files:
            (fsrc, fdst) = (f, f) if isinstance(f, str) else f
            copy_file(self.prev_folder / fsrc, self.workdir / fdst)
        for f in self.link_share_files:
            (fsrc, fdst) = (f, f) if isinstance(f, str) else f
            link_file(self.share_folder / fsrc, self.workdir / fdst)
        for f in self.copy_share_files:
            (fsrc, fdst) = (f, f) if isinstance(f, str) else f
            copy_file(self.share_folder / fsrc, self.workdir / fdst)
        for f in self.link_abs_files:
            (fsrc, fdst) = (f, os.path.basename(f)) if isinstance(f, str) else f
            link_file(fsrc, self.workdir / fdst, use_abs=True)
        for f in self.copy_abs_files:
            (fsrc, fdst) = (f, os.path.basename(f)) if isinstance(f, str) else f
            copy_file(fsrc, self.workdir / fdst)
    
    def execute(self):
        raise NotImplementedError

    def postprocess(self):
        pass
        
    def run(self, *args, **kwargs):
        self.preprocess()
        self.olddir = os.getcwd()
        os.chdir(self.workdir)
        self.execute()
        os.chdir(self.olddir)
        self.postprocess()
    
    def set_prev_task(self, task):
        assert isinstance(task, AbstructTask)
        self.prev_folder = None
        self.prev_task = task

    def set_prev_folder(self, path):
        self.prev_task = None
        self.prev_folder = path


class BlankTask(AbstructTask):
    def execute(self):
        pass


class PythonTask(AbstructTask):
    def __init__(self, pycallable, 
                 call_args=None, call_kwargs=None, 
                 outlog=None, errlog=None,
                 **task_args):
        super().__init__(**task_args)
        self.pycallable = pycallable
        self.call_args = call_args if call_args is not None else []
        self.call_kwargs = call_kwargs if call_kwargs is not None else {}
        self.outlog = outlog
        self.errlog = errlog
    
    def execute(self):
        with (open(self.outlog, 'w', 1) if self.outlog is not None 
                else nullcontext(sys.stdout)) as fo, \
             redirect_stdout(fo), \
             (open(self.errlog, 'w', 1) if self.errlog is not None 
                else nullcontext(sys.stderr)) as fe, \
             redirect_stderr(fe):
            self.pycallable(*self.call_args, **self.call_kwargs)


class ShellTask(AbstructTask):
    def __init__(self, cmd, env=None,
                 outlog=None, errlog=None,
                 **task_args):
        super().__init__(**task_args)
        self.cmd = cmd
        self.env = env
        self.outlog = outlog
        self.errlog = errlog

    def execute(self):
        with (open(self.outlog, 'w', 1) if self.outlog is not None 
                else nullcontext()) as fo, \
             (open(self.errlog, 'w', 1) if self.errlog is not None 
                else nullcontext()) as fe:
            sp.run(self.cmd, env=self.env, shell=True, stdout=fo, stderr=fe)


class BatchTask(AbstructTask):
    def __init__(self, cmds, 
                 dispatcher=None, resources=None, 
                 outlog='log', errlog='err', 
                 forward_files=None, backward_files=None,
                 **task_args):
        super().__init__(**task_args)
        self.cmds = check_list(cmds)
        if dispatcher is None:
            dispatcher = Dispatcher()
        elif isinstance(dispatcher, dict):
            dispatcher = Dispatcher(**dispatcher)
        assert isinstance(dispatcher, Dispatcher)
        self.dispatcher = dispatcher
        self.resources = resources
        self.outlog = outlog
        self.errlog = errlog
        self.forward_files = check_list(forward_files)
        self.backward_files = check_list(backward_files)
    
    def execute(self):
        tdict = self.make_dict(base=self.workdir)
        self.dispatcher.run_jobs([tdict], group_size=1, work_path='.', 
                                 resources=self.resources, forward_task_deref=True,
                                 outlog=self.outlog, errlog=self.errlog)

    def make_dict(self, base='.'):
        return {'dir': str(self.workdir.relative_to(base)),
                'cmds': self.cmds,
                "resources": self.resources,
                'forward_files': self.forward_files,
                'backward_files': self.backward_files}


class GroupBatchTask(AbstructTask):
    # after grouping up, the following individual setting would be ignored:
    # dispatcher, outlog, errlog
    # only grouped one setting in this task would be effective
    def __init__(self, batch_tasks, group_size=1, ingroup_parallel=1,
                 dispatcher=None, resources=None, 
                 outlog='log', errlog='err', 
                 forward_files=None, backward_files=None,
                 **task_args):
        super().__init__(**task_args)            
        self.batch_tasks = [deepcopy(task) for task in batch_tasks]
        for task in self.batch_tasks:
            assert isinstance(task, BatchTask), f'given task is instance of {task.__class__}'
            assert not task.workdir.is_absolute()
            task.prepend_workdir(self.workdir)
            if task.prev_folder is None:
                task.prev_folder = self.prev_folder
        self.group_size = group_size
        self.para_deg = ingroup_parallel
        if dispatcher is None:
            dispatcher = Dispatcher()
        elif isinstance(dispatcher, dict):
            dispatcher = Dispatcher(**dispatcher)
        assert isinstance(dispatcher, Dispatcher)
        self.dispatcher = dispatcher
        self.resources = resources
        self.outlog = outlog
        self.errlog = errlog
        self.forward_files = check_list(forward_files)
        self.backward_files = check_list(backward_files)

    def execute(self):
        tdicts = [t.make_dict(base=self.workdir) for t in self.batch_tasks]
        self.dispatcher.run_jobs(tdicts, group_size=self.group_size, para_deg=self.para_deg,
                                 work_path='.', resources=self.resources, 
                                 forward_task_deref=False,
                                 forward_common_files=self.forward_files,
                                 backward_common_files=self.backward_files,
                                 outlog=self.outlog, errlog=self.errlog)
    
    def preprocess(self):
        # if (self.workdir / 'fin.record').exists():
        #     return
        super().preprocess()
        for t in self.batch_tasks:
            t.preprocess()

    def prepend_workdir(self, path):
        super().prepend_workdir(path)
        for t in self.batch_tasks:
            t.prepend_workdir(path)
    
    def set_prev_task(self, task):
        super().set_prev_task(task)
        for t in self.batch_tasks:
            t.set_prev_task(task)
    
    def set_prev_folder(self, path):
        super().set_prev_folder(path)
        for t in self.batch_tasks:
            t.set_prev_folder(path)