Unverified Commit 53117c51 authored by kylasa's avatar kylasa Committed by GitHub
Browse files

Reading files in chunks to reduce the memory footprint of pyarrow (#4795)

All tasks completed.
parent 5f0b61b8
...@@ -337,11 +337,18 @@ def get_dataset(input_dir, graph_name, rank, world_size, schema_map): ...@@ -337,11 +337,18 @@ def get_dataset(input_dir, graph_name, rank, world_size, schema_map):
if not os.path.isabs(edge_file): if not os.path.isabs(edge_file):
edge_file = os.path.join(input_dir, edge_file) edge_file = os.path.join(input_dir, edge_file)
logging.info(f'Loading edges of etype[{etype_name}] from {edge_file}') logging.info(f'Loading edges of etype[{etype_name}] from {edge_file}')
data_df = csv.read_csv(edge_file,
read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True), read_options=pyarrow.csv.ReadOptions(use_threads=True, block_size=4096, autogenerate_column_names=True)
parse_options=pyarrow.csv.ParseOptions(delimiter=' ')) parse_options=pyarrow.csv.ParseOptions(delimiter=' ')
src_ids.append(data_df['f0'].to_numpy()) with pyarrow.csv.open_csv(edge_file, read_options=read_options, parse_options=parse_options) as reader:
dst_ids.append(data_df['f1'].to_numpy()) for next_chunk in reader:
if next_chunk is None:
break
next_table = pyarrow.Table.from_batches([next_chunk])
src_ids.append(next_table['f0'].to_numpy())
dst_ids.append(next_table['f1'].to_numpy())
src_ids = np.concatenate(src_ids) src_ids = np.concatenate(src_ids)
dst_ids = np.concatenate(dst_ids) dst_ids = np.concatenate(dst_ids)
......
...@@ -3,6 +3,7 @@ import os ...@@ -3,6 +3,7 @@ import os
import numpy as np import numpy as np
import pyarrow import pyarrow
import torch import torch
import copy
from pyarrow import csv from pyarrow import csv
from gloo_wrapper import alltoallv_cpu from gloo_wrapper import alltoallv_cpu
...@@ -60,10 +61,19 @@ class DistLookupService: ...@@ -60,10 +61,19 @@ class DistLookupService:
filename = f'{ntype}.txt' filename = f'{ntype}.txt'
logging.info(f'[Rank: {rank}] Reading file: {os.path.join(input_dir, filename)}') logging.info(f'[Rank: {rank}] Reading file: {os.path.join(input_dir, filename)}')
df = csv.read_csv(os.path.join(input_dir, '{}.txt'.format(ntype)), \
read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True), \ read_options=pyarrow.csv.ReadOptions(use_threads=True, block_size=4096, autogenerate_column_names=True)
parse_options=pyarrow.csv.ParseOptions(delimiter=' ')) parse_options=pyarrow.csv.ParseOptions(delimiter=' ')
ntype_partids = df['f0'].to_numpy() ntype_partids = []
with pyarrow.csv.open_csv(os.path.join(input_dir, '{}.txt'.format(ntype)),
read_options=read_options, parse_options=parse_options) as reader:
for next_chunk in reader:
if next_chunk is None:
break
next_table = pyarrow.Table.from_batches([next_chunk])
ntype_partids.append(next_table['f0'].to_numpy())
ntype_partids = np.concatenate(ntype_partids)
count = len(ntype_partids) count = len(ntype_partids)
ntype_count.append(count) ntype_count.append(count)
...@@ -75,8 +85,12 @@ class DistLookupService: ...@@ -75,8 +85,12 @@ class DistLookupService:
end = count end = count
type_nid_begin.append(start) type_nid_begin.append(start)
type_nid_end.append(end) type_nid_end.append(end)
# Slice the partition-ids which belong to the current instance. # Slice the partition-ids which belong to the current instance.
partid_list.append(ntype_partids[start:end]) partid_list.append(copy.deepcopy(ntype_partids[start:end]))
# Explicitly release the array read from the file.
del ntype_partids
# Store all the information in the object instance variable. # Store all the information in the object instance variable.
self.id_map = id_map self.id_map = id_map
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment