frame.py 13 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
from dgl.array import DGLArray, DGLDenseArray, DGLSparseArray
import dgl.backend as F
from collections import MutableMapping
from functools import reduce
from itertools import dropwhile
import operator

class DGLFrame(MutableMapping):
    def __init__(self, data=None):
        self._columns = {}
        if data is None:
            pass
        elif isinstance(data, dict):
            for key, value in data.items():
                device = self.device()
                if device:
                    assert value.device() == device
                if type(value) is DGLDenseArray:
                    num_rows = self.num_rows()
                    if num_rows:
                        assert value.shape[0] == num_rows
                self._columns[key] = value
        else:
            raise NotImplementedError()

    def __copy__(self):
        return self._columns.copy()

    def __delitem__(self, key):
        """
        """
        del self._columns[key]

    def __getitem__(self, key):
        """
        This method does things based on the type of `key`.

        If `key` is:
            * str
                selects column with name 'key'
            * type
                selects all columns with types matching the type
            * list of str or type
                selects all columns with names or type in the list
            * DGLArray
                Performs a logical filter.  Expects given DGLArray to be the same
                length as all columns in current DGLFrame.  Every row
                corresponding with an entry in the given DGLArray that is
                equivalent to False is filtered from the result.
            * int
                Returns a single row of the DGLFrame (the `key`th one) as a dictionary.
            * slice
                Returns an DGLFrame including only the sliced rows.
        """
        if type(key) is str:
            return self._columns[key]
        elif type(key) is type:
            raise NotImplementedError()
        elif type(key) is list:
            raise NotImplementedError()
        elif type(key) is DGLDenseArray:
            return DGLFrame({k : v[key] for k, v in self._columns.items()})
        elif type(key) is int:
            return {k : v[key] for k, v in self._columns.items()}
        elif type(key) is slice:
            raise NotImplementedError()
        else:
            raise RuntimeError()

    def __iter__(self):
        return iter(self._columns.keys())

    def __len__(self):
        return len(self._columns)

    def __setitem__(self, key, value):
        """
        A wrapper around add_column(s).  Key can be either a list or a str.  If
        value is an DGLArray, it is added to the DGLFrame as a column.  If it is a
        constant value (int, str, or float), then a column is created where
        every entry is equal to the constant value.  Existing columns can also
        be replaced using this wrapper.
        """
        if type(key) is str:
            if type(value) is DGLDenseArray:
                assert value.shape[0] == self.num_rows()
                self._columns[key] = value
            elif type(value) is DGLSparseArray:
                raise NotImplementedError()
            else:
                raise RuntimeError()
        elif type(key) is list:
            raise NotImplementedError()
        else:
            raise RuntimeError()

    def _next_dense_column(self):
        if self._columns:
            predicate = lambda x: type(x) is DGLDenseArray
            try:
                return next(dropwhile(predicate, self._columns.values()))
            except StopIteration:
                return None
        else:
            return None

    def append(self, other):
        """
        Add the rows of an DGLFrame to the end of this DGLFrame.

        Both DGLFrames must have the same set of columns with the same column
        names and column types.

        Parameters
        ----------
        other : DGLFrame
            Another DGLFrame whose rows are appended to the current DGLFrame.

        Returns
        -------
        out : DGLFrame
            The result DGLFrame from the append operation.
        """
        assert isisntance(other, DGLFrame)
        assert set(self._columns) == set(other._columns)
        if self.num_rows() == 0:
            return other.__copy__()
        elif self.num_rows() == 0:
            return self.__copy__()
        else:
            return {k : v.append(other[k]) for k, v in self._columns.items()}

    def device(self):
        dense_column = self._next_dense_column()
        return None if dense_column is None else dense_column.device()

    def dropna(self, columns=None, how='any'):
        columns = list(self._columns) if columns is None else columns

        assert type(columns) is list
        assert len(columns) > 0

        column_list = [self._columns[x] for x in columns]
        if all(type(x) is DGLDenseArray for x in column_list):
            a_list = [x.applicable for x in column_list]
            if how == 'any':
                a = reduce(operator.mul, a_list)
            elif how == 'all':
                a = (reduce(operator.add, a_list) > 0)
            else:
                raise RuntimeError()
            a_array = DGLDenseArray(a)
            return DGLFrame({k : v[a_array] for k, v in self._columns.items()})
        else:
            raise NotImplementedError()

    def filter_by(self, values, column_name, exclude=False):
        """
        Filter an DGLFrame by values inside an iterable object. Result is an
        DGLFrame that only includes (or excludes) the rows that have a column
        with the given ``column_name`` which holds one of the values in the
        given ``values`` :class:`~turicreate.DGLArray`. If ``values`` is not an
        DGLArray, we attempt to convert it to one before filtering.

        Parameters
        ----------
        values : DGLArray | list | numpy.ndarray | pandas.Series | str
            The values to use to filter the DGLFrame.  The resulting DGLFrame will
            only include rows that have one of these values in the given
            column.

        column_name : str
            The column of the DGLFrame to match with the given `values`.

        exclude : bool
            If True, the result DGLFrame will contain all rows EXCEPT those that
            have one of ``values`` in ``column_name``.

        Returns
        -------
        out : DGLFrame
            The filtered DGLFrame.
        """
        if type(values) is DGLDenseArray:
            mask = F.isin(self._columns[column_name], values.data)
            if exclude:
                mask = 1 - mask
            return self[mask]
        else:
            raise NotImplementedError()

    def groupby(self, key_column_names, operations, *args):
        """
        Perform a group on the key_column_names followed by aggregations on the
        columns listed in operations.

        The operations parameter is a dictionary that indicates which
        aggregation operators to use and which columns to use them on. The
        available operators are SUM, MAX, MIN, COUNT, AVG, VAR, STDV, CONCAT,
        SELECT_ONE, ARGMIN, ARGMAX, and QUANTILE. For convenience, aggregators
        MEAN, STD, and VARIANCE are available as synonyms for AVG, STDV, and
        VAR. See :mod:`~turicreate.aggregate` for more detail on the aggregators.

        Parameters
        ----------
        key_column_names : string | list[string]
            Column(s) to group by. Key columns can be of any type other than
            dictionary.

        operations : dict, list
            Dictionary of columns and aggregation operations. Each key is a
            output column name and each value is an aggregator. This can also
            be a list of aggregators, in which case column names will be
            automatically assigned.

        *args
            All other remaining arguments will be interpreted in the same
            way as the operations argument.

        Returns
        -------
        out_sf : DGLFrame
            A new DGLFrame, with a column for each groupby column and each
            aggregation operation.

        See Also
        --------
        aggregate

        Notes
        -----
        * Numeric aggregators (such as sum, mean, stdev etc.) follow the skip
        None policy i.e they will omit all missing values from the aggregation.
        As an example, `sum([None, 5, 10]) = 15` because the `None` value is
        skipped.
        * Aggregators have a default value when no values (after skipping all
        `None` values) are present. Default values are `None` for ['ARGMAX',
        'ARGMIN', 'AVG', 'STD', 'MEAN', 'MIN', 'MAX'],  `0` for ['COUNT'
        'COUNT_DISTINCT', 'DISTINCT'] `[]` for 'CONCAT', 'QUANTILE',
        'DISTINCT', and `{}` for 'FREQ_COUNT'.
        """
        if type(key_column_names) is str:
            if type(operations) is list:
                raise NotImplementedError()
            elif type(operations) is dict:
                if len(operations) == 1:
                    dst_solumn_name, = operations.keys()
                    aggregator, = operations.values()
                    return DGLFrame(aggregator(self, key_column_names))
                else:
                    raise NotImplementedError()
            else:
                raise RuntimeError()
        else:
            raise NotImplementedError()

    def join(self, right, on=None, how='inner'):
        """
        Merge two DGLFrames. Merges the current (left) DGLFrame with the given
        (right) DGLFrame using a SQL-style equi-join operation by columns.

        Parameters
        ----------
        right : DGLFrame
            The DGLFrame to join.

        on : None | str | list | dict, optional
            The column name(s) representing the set of join keys.  Each row that
            has the same value in this set of columns will be merged together.

            * If 'None' is given, join will use all columns that have the same
              name as the set of join keys.

            * If a str is given, this is interpreted as a join using one column,
              where both DGLFrames have the same column name.

            * If a list is given, this is interpreted as a join using one or
              more column names, where each column name given exists in both
              DGLFrames.

            * If a dict is given, each dict key is taken as a column name in the
              left DGLFrame, and each dict value is taken as the column name in
              right DGLFrame that will be joined together. e.g.
              {'left_col_name':'right_col_name'}.

        how : {'left', 'right', 'outer', 'inner'}, optional
            The type of join to perform.  'inner' is default.

            * inner: Equivalent to a SQL inner join.  Result consists of the
              rows from the two frames whose join key values match exactly,
              merged together into one DGLFrame.

            * left: Equivalent to a SQL left outer join. Result is the union
              between the result of an inner join and the rest of the rows from
              the left DGLFrame, merged with missing values.

            * right: Equivalent to a SQL right outer join.  Result is the union
              between the result of an inner join and the rest of the rows from
              the right DGLFrame, merged with missing values.

            * outer: Equivalent to a SQL full outer join. Result is
              the union between the result of a left outer join and a right
              outer join.

        Returns
        -------
        out : DGLFrame
        """
        assert type(right) == DGLFrame
        if on is None:
            raise NotImplementedError()
        elif type(on) is str:
            assert set(self._columns).intersection(set(right._columns)) == {on}
        elif type(on) is list:
            raise NotImplementedError()
        elif type(on) is dict:
            raise NotImplementedError()
        else:
            raise RuntimeError()

        if how == 'left':
            raise NotImplementedError()
        elif how == 'right':
            raise NotImplementedError()
        elif how == 'outer':
            raise NotImplementedError()
        elif how == 'inner':
            lhs = self._columns[on]
            rhs = right._columns[on]
            if type(lhs) is DGLDenseArray and type(rhs) is DGLDenseArray:
                if isinstance(lhs.data, F.Tensor) and isinstance(rhs.data, F.Tensor) and \
                    len(F.shape(lhs.data)) == 1 and len(F.shape(rhs.data)) == 1:
                    assert F.prod(lhs.applicable) and F.prod(rhs.applicable)
                    isin = F.isin(lhs.data, rhs.data)
                    columns = {k : v[isin] for k, v in self._columns.items()}
                    columns.update({k : v for k, v in self._columns.items() if k != on})
                else:
                    raise NotImplementedError()
            else:
                raise NotImplementedError()
        else:
            raise RuntimeError()

    def num_rows(self):
        dense_column = self._next_dense_column()
        return None if dense_column is None else dense_column.shape[0]