data_utils.py 3.61 KB
Newer Older
chenzk's avatar
v1.0  
chenzk committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90

import torch
import torchvision
import re
import cv2
import numpy as np
import os
import yaml
import logging
from PIL import Image
import torch.distributed as dist
from data.utils.visual_trace import visual_trace
from data.utils.som_tom import som_prompting, tom_prompting
from data.conversations import Constructor
from .conf import VLAConfig, VLARegistry
from dataclasses import dataclass, field
from magma.processing_magma import MagmaProcessor
from .materialize import get_vla_dataset_and_collator
from .datasets.rlds.utils.data_utils import save_dataset_statistics
from data.utils.visual_tracker import visual_tracker

logger = logging.getLogger(__name__)

"""
data_utils.py

General utilities and classes for facilitating data loading and collation.
"""

from dataclasses import dataclass
from typing import Callable, Dict, Sequence, Tuple

import torch
from torch.nn.utils.rnn import pad_sequence
from torch import distributed as dist
# HuggingFace Default / LLaMa-2 IGNORE_INDEX (for labels)
IGNORE_INDEX = -100

@dataclass
class OpenXDataItem:
    def __call__(self, data_root_dir, data_soup, processor=None, conversation_lib=None, image_aug=False, local_run=False, future_action_window_size=1):
        # VLAConfig (`prismatic/conf/vla.py`); override with --vla.type `VLARegistry.<VLA>.vla_id`
        self.openx_data_cfg = VLAConfig.get_choice_class(data_soup)
        default_image_resolution = processor.image_processor.base_img_size
        logger.info(f"Creating VLA Open-X Dataset with Mixture `{self.openx_data_cfg.data_mix}`")
        with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'settings.yaml'), 'r') as file:
            self.settings = yaml.safe_load(file)       

        # get local rank for distributed training
        rank = dist.get_rank() if dist.is_initialized() else 0        
        rank = rank % torch.cuda.device_count()
        openx_dataset, action_tokenizer, collator = get_vla_dataset_and_collator(
            data_root_dir,
            self.openx_data_cfg.data_mix,
            shuffle_buffer_size=1 if (local_run or future_action_window_size>1) else self.openx_data_cfg.shuffle_buffer_size,
            image_transform=processor.image_processor,
            visual_tracker=visual_tracker(**self.settings.get('tracker', None), device=f"cuda:{rank}"), 
            dataset_settings=self.settings,
            tokenizer=processor.tokenizer,
            default_image_resolution=(3, default_image_resolution, default_image_resolution),
            image_aug=image_aug,
            future_action_window_size=future_action_window_size, 
            prompt_builder_fn=conversation_lib, # vlm.llm_backbone.prompt_builder_fn,
            local_run=local_run,
        )

        # Save dataset statistics for de-normalization at inference time
        # if overwatch.is_rank_zero():
        #     save_dataset_statistics(openx_dataset.dataset_statistics, run_dir)

        return openx_dataset

class OpenX(Constructor):
    def __init__(self, **kwargs):
        super(OpenX, self).__init__(**kwargs)
        # load settings from settings.yaml file
        with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'settings.yaml'), 'r') as file:
            self.settings = yaml.safe_load(file)
        self.spatial_quant_size = kwargs.get('spatial_quant_size', 256)   # this is also used for open-x
        self.num_clusters = self.settings['trace_processor']['num_clusters']
        self.root_dir = kwargs.get('dataset_folder', None)

    def __call__(self, **kwargs):
        return super()._construct_conv(**kwargs)
    
    def filter_items(self, items):
        """
        Filter invalid items
        """
        return items