Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
ModelZoo
ResNet50_tensorflow
Commits
5b5073d2
Commit
5b5073d2
authored
Aug 24, 2021
by
A. Unique TensorFlower
Browse files
Preprocessing scripts for the Criteo dataset.
PiperOrigin-RevId: 392782508
parent
ea90f0c1
Changes
6
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
579 additions
and
0 deletions
+579
-0
official/pip_package/setup.py
official/pip_package/setup.py
+1
-0
official/recommendation/ranking/README.md
official/recommendation/ranking/README.md
+3
-0
official/recommendation/ranking/preprocessing/README.md
official/recommendation/ranking/preprocessing/README.md
+120
-0
official/recommendation/ranking/preprocessing/criteo_preprocess.py
...recommendation/ranking/preprocessing/criteo_preprocess.py
+310
-0
official/recommendation/ranking/preprocessing/setup.py
official/recommendation/ranking/preprocessing/setup.py
+30
-0
official/recommendation/ranking/preprocessing/shard_rebalancer.py
.../recommendation/ranking/preprocessing/shard_rebalancer.py
+115
-0
No files found.
official/pip_package/setup.py
View file @
5b5073d2
...
@@ -81,6 +81,7 @@ setup(
...
@@ -81,6 +81,7 @@ setup(
'official.pip_package*'
,
'official.pip_package*'
,
'official.benchmark*'
,
'official.benchmark*'
,
'official.colab*'
,
'official.colab*'
,
'official.recommendation.ranking.data.preprocessing*'
,
]),
]),
exclude_package_data
=
{
exclude_package_data
=
{
''
:
[
'*_test.py'
,],
''
:
[
'*_test.py'
,],
...
...
official/recommendation/ranking/README.md
View file @
5b5073d2
...
@@ -68,6 +68,9 @@ Note that the dataset is large (~1TB).
...
@@ -68,6 +68,9 @@ Note that the dataset is large (~1TB).
### Preprocess the data
### Preprocess the data
Follow the instructions in
[
Data Preprocessing
](
data/preprocessing
)
to
preprocess the Criteo Terabyte dataset.
Data preprocessing steps are summarized below.
Data preprocessing steps are summarized below.
Integer feature processing steps, sequentially:
Integer feature processing steps, sequentially:
...
...
official/recommendation/ranking/preprocessing/README.md
0 → 100644
View file @
5b5073d2
## Download and preprocess Criteo TB dataset
[
Apache Beam
](
https://beam.apache.org
)
enables distributed preprocessing of the
dataset and can be run on
[
Google Cloud Dataflow
](
https://cloud.google.com/dataflow/
)
. The preprocessing
scripts can be run locally via DirectRunner provided that the local host has
enough CPU/Memory/Storage.
Install required packages.
```
bash
python3 setup.py
install
```
Set up the following environment variables, replacing bucket-name with the name
of your Cloud Storage bucket and project name with your GCP project name.
```
bash
export
STORAGE_BUCKET
=
gs://bucket-name
export
PROJECT
=
my-gcp-project
export
REGION
=
us-central1
```
Note: If running locally above environment variables won't be needed and instead
of gs://bucket-name a local path can be used, also consider passing smaller
`max_vocab_size`
argument.
1.
Download raw
[
Criteo TB dataset
](
https://labs.criteo.com/2013/12/download-terabyte-click-logs/
)
to a GCS bucket.
Organize the data in the following way:
*
The files day_0.gz, day_1.gz, ..., day_22.gz in
${STORAGE_BUCKET}/criteo_raw/train/
*
The file day_23.gz in ${STORAGE_BUCKET}/criteo_raw/test/
2.
Shard the raw training/test data into multiple files.
```
bash
python3 shard_rebalancer.py
\
--input_path
"
${
STORAGE_BUCKET
}
/criteo_raw/train/*"
\
--output_path
"
${
STORAGE_BUCKET
}
/criteo_raw_sharded/train/train"
\
--num_output_files
1024
--filetype
csv
--runner
DataflowRunner
\
--project
${
PROJECT
}
--region
${
REGION
}
```
```
bash
python3 shard_rebalancer.py
\
--input_path
"
${
STORAGE_BUCKET
}
/criteo_raw/test/*"
\
--output_path
"
${
STORAGE_BUCKET
}
/criteo_raw_sharded/test/test"
\
--num_output_files
64
--filetype
csv
--runner
DataflowRunner
\
--project
${
PROJECT
}
--region
${
REGION
}
```
3.
Generate vocabulary and preprocess the data.
Generate vocabulary:
```
bash
python3 criteo_preprocess.py
\
--input_path
"
${
STORAGE_BUCKET
}
/criteo_raw_sharded/*/*"
\
--output_path
"
${
STORAGE_BUCKET
}
/criteo/"
\
--temp_dir
"
${
STORAGE_BUCKET
}
/criteo_vocab/"
\
--vocab_gen_mode
--runner
DataflowRunner
--max_vocab_size
5000000
\
--project
${
PROJECT
}
--region
${
REGION
}
```
Preprocess training and test data:
```
bash
python3 criteo_preprocess.py
\
--input_path
"
${
STORAGE_BUCKET
}
/criteo_raw_sharded/train/*"
\
--output_path
"
${
STORAGE_BUCKET
}
/criteo/train/train"
\
--temp_dir
"
${
STORAGE_BUCKET
}
/criteo_vocab/"
\
--runner
DataflowRunner
--max_vocab_size
5000000
\
--project
${
PROJECT
}
--region
${
REGION
}
```
```
bash
python3 criteo_preprocess.py
\
--input_path
"
${
STORAGE_BUCKET
}
/criteo_raw_sharded/test/*"
\
--output_path
"
${
STORAGE_BUCKET
}
/criteo/test/test"
\
--temp_dir
"
${
STORAGE_BUCKET
}
/criteo_vocab/"
\
--runner
DataflowRunner
--max_vocab_size
5000000
\
--project
${
PROJECT
}
--region
${
REGION
}
```
4.
(Optional) Re-balance the dataset.
```
bash
python3 shard_rebalancer.py
\
--input_path
"
${
STORAGE_BUCKET
}
/criteo/train/*"
\
--output_path
"
${
STORAGE_BUCKET
}
/criteo_balanced/train/train"
\
--num_output_files
8192
--filetype
csv
--runner
DataflowRunner
\
--project
${
PROJECT
}
--region
${
REGION
}
```
```
bash
python3 shard_rebalancer.py
\
--input_path
"
${
STORAGE_BUCKET
}
/criteo/test/*"
\
--output_path
"
${
STORAGE_BUCKET
}
/criteo_balanced/test/test"
\
--num_output_files
1024
--filetype
csv
--runner
DataflowRunner
\
--project
${
PROJECT
}
--region
${
REGION
}
```
At this point training and test data are in the buckets:
*
`${STORAGE_BUCKET}/criteo_balanced/train/`
*
`${STORAGE_BUCKET}/criteo_balanced/test/`
All other buckets can be removed.
official/recommendation/ranking/preprocessing/criteo_preprocess.py
0 → 100644
View file @
5b5073d2
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""TFX beam preprocessing pipeline for Criteo data.
Preprocessing util for criteo data. Transformations:
1. Fill missing features with zeros.
2. Set negative integer features to zeros.
3. Normalize integer features using log(x+1).
4. For categorical features (hex), convert to integer and take value modulus the
max_vocab_size value.
Usage:
For raw Criteo data, this script should be run twice.
First run should set vocab_gen_mode to true. This run is used to generate
vocabulary files in the temp_dir location.
Second run should set vocab_gen_mode to false. It is necessary to point to the
same temp_dir used during the first run.
"""
import
argparse
import
datetime
import
os
from
absl
import
logging
import
apache_beam
as
beam
import
numpy
as
np
import
tensorflow
as
tf
import
tensorflow_transform
as
tft
import
tensorflow_transform.beam
as
tft_beam
from
tensorflow_transform.tf_metadata
import
dataset_metadata
from
tensorflow_transform.tf_metadata
import
schema_utils
from
tfx_bsl.public
import
tfxio
parser
=
argparse
.
ArgumentParser
()
parser
.
add_argument
(
"--input_path"
,
default
=
None
,
required
=
True
,
help
=
"Input path. Be sure to set this to cover all data, to ensure "
"that sparse vocabs are complete."
)
parser
.
add_argument
(
"--output_path"
,
default
=
None
,
required
=
True
,
help
=
"Output path."
)
parser
.
add_argument
(
"--temp_dir"
,
default
=
None
,
required
=
True
,
help
=
"Directory to store temporary metadata. Important because vocab "
"dictionaries will be stored here. Co-located with data, ideally."
)
parser
.
add_argument
(
"--csv_delimeter"
,
default
=
"
\t
"
,
help
=
"Delimeter string for input and output."
)
parser
.
add_argument
(
"--vocab_gen_mode"
,
action
=
"store_true"
,
default
=
False
,
help
=
"If it is set, process full dataset and do not write CSV output. In "
"this mode, See temp_dir for vocab files. input_path should cover all "
"data, e.g. train, test, eval."
)
parser
.
add_argument
(
"--runner"
,
help
=
"Runner for Apache Beam, needs to be one of {DirectRunner, "
"DataflowRunner}."
,
default
=
"DirectRunner"
)
parser
.
add_argument
(
"--project"
,
default
=
None
,
help
=
"ID of your project. Ignored by DirectRunner."
)
parser
.
add_argument
(
"--region"
,
default
=
None
,
help
=
"Region. Ignored by DirectRunner."
)
parser
.
add_argument
(
"--max_vocab_size"
,
type
=
int
,
default
=
10_000_000
,
help
=
"Max index range, categorical features convert to integer and take "
"value modulus the max_vocab_size"
)
args
=
parser
.
parse_args
()
NUM_NUMERIC_FEATURES
=
13
NUMERIC_FEATURE_KEYS
=
[
f
"int-feature-
{
x
+
1
}
"
for
x
in
range
(
NUM_NUMERIC_FEATURES
)]
CATEGORICAL_FEATURE_KEYS
=
[
"categorical-feature-%d"
%
x
for
x
in
range
(
NUM_NUMERIC_FEATURES
+
1
,
40
)]
LABEL_KEY
=
"clicked"
# Data is first preprocessed in pure Apache Beam using numpy.
# This removes missing values and hexadecimal-encoded values.
# For the TF schema, we can thus specify the schema as FixedLenFeature
# for TensorFlow Transform.
FEATURE_SPEC
=
dict
([(
name
,
tf
.
io
.
FixedLenFeature
([],
dtype
=
tf
.
int64
))
for
name
in
CATEGORICAL_FEATURE_KEYS
]
+
[(
name
,
tf
.
io
.
FixedLenFeature
([],
dtype
=
tf
.
float32
))
for
name
in
NUMERIC_FEATURE_KEYS
]
+
[(
LABEL_KEY
,
tf
.
io
.
FixedLenFeature
([],
tf
.
float32
))])
INPUT_METADATA
=
dataset_metadata
.
DatasetMetadata
(
schema_utils
.
schema_from_feature_spec
(
FEATURE_SPEC
))
def
apply_vocab_fn
(
inputs
):
"""Preprocessing fn for sparse features.
Applies vocab to bucketize sparse features. This function operates using
previously-created vocab files.
Pre-condition: Full vocab has been materialized.
Args:
inputs: Input features to transform.
Returns:
Output dict with transformed features.
"""
outputs
=
{}
outputs
[
LABEL_KEY
]
=
inputs
[
LABEL_KEY
]
for
key
in
NUMERIC_FEATURE_KEYS
:
outputs
[
key
]
=
inputs
[
key
]
for
idx
,
key
in
enumerate
(
CATEGORICAL_FEATURE_KEYS
):
vocab_fn
=
os
.
path
.
join
(
args
.
temp_dir
,
"tftransform_tmp"
,
"feature_{}_vocab"
.
format
(
idx
))
outputs
[
key
]
=
tft
.
apply_vocabulary
(
inputs
[
key
],
vocab_fn
)
return
outputs
def
compute_vocab_fn
(
inputs
):
"""Preprocessing fn for sparse features.
This function computes unique IDs for the sparse features. We rely on implicit
behavior which writes the vocab files to the vocab_filename specified in
tft.compute_and_apply_vocabulary.
Pre-condition: Sparse features have been converted to integer and mod'ed with
args.max_vocab_size.
Args:
inputs: Input features to transform.
Returns:
Output dict with transformed features.
"""
outputs
=
{}
outputs
[
LABEL_KEY
]
=
inputs
[
LABEL_KEY
]
for
key
in
NUMERIC_FEATURE_KEYS
:
outputs
[
key
]
=
inputs
[
key
]
for
idx
,
key
in
enumerate
(
CATEGORICAL_FEATURE_KEYS
):
outputs
[
key
]
=
tft
.
compute_and_apply_vocabulary
(
x
=
inputs
[
key
],
vocab_filename
=
"feature_{}_vocab"
.
format
(
idx
))
return
outputs
class
FillMissing
(
beam
.
DoFn
):
"""Fills missing elements with zero string value."""
def
process
(
self
,
element
):
elem_list
=
element
.
split
(
args
.
csv_delimeter
)
out_list
=
[]
for
val
in
elem_list
:
new_val
=
"0"
if
not
val
else
val
out_list
.
append
(
new_val
)
yield
(
args
.
csv_delimeter
).
join
(
out_list
)
class
NegsToZeroLog
(
beam
.
DoFn
):
"""For int features, sets negative values to zero and takes log(x+1)."""
def
process
(
self
,
element
):
elem_list
=
element
.
split
(
args
.
csv_delimeter
)
out_list
=
[]
for
i
,
val
in
enumerate
(
elem_list
):
if
i
>
0
and
i
<=
NUM_NUMERIC_FEATURES
:
new_val
=
"0"
if
int
(
val
)
<
0
else
val
new_val
=
np
.
log
(
int
(
new_val
)
+
1
)
new_val
=
str
(
new_val
)
else
:
new_val
=
val
out_list
.
append
(
new_val
)
yield
(
args
.
csv_delimeter
).
join
(
out_list
)
class
HexToIntModRange
(
beam
.
DoFn
):
"""For categorical features, takes decimal value and mods with max value."""
def
process
(
self
,
element
):
elem_list
=
element
.
split
(
args
.
csv_delimeter
)
out_list
=
[]
for
i
,
val
in
enumerate
(
elem_list
):
if
i
>
NUM_NUMERIC_FEATURES
:
new_val
=
int
(
val
,
16
)
%
args
.
max_vocab_size
else
:
new_val
=
val
out_list
.
append
(
str
(
new_val
))
yield
str
.
encode
((
args
.
csv_delimeter
).
join
(
out_list
))
def
transform_data
(
data_path
,
output_path
):
"""Preprocesses Criteo data.
Two processing modes are supported. Raw data will require two passes.
If full vocab files already exist, only one pass is necessary.
Args:
data_path: File(s) to read.
output_path: Path to which output CSVs are written, if necessary.
"""
preprocessing_fn
=
compute_vocab_fn
if
args
.
vocab_gen_mode
else
apply_vocab_fn
gcp_project
=
args
.
project
region
=
args
.
region
job_name
=
(
f
"criteo-preprocessing-"
f
"
{
datetime
.
datetime
.
now
().
strftime
(
'%y%m%d-%H%M%S'
)
}
"
)
# set up Beam pipeline.
pipeline_options
=
None
if
args
.
runner
==
"DataflowRunner"
:
options
=
{
"staging_location"
:
os
.
path
.
join
(
output_path
,
"tmp"
,
"staging"
),
"temp_location"
:
os
.
path
.
join
(
output_path
,
"tmp"
),
"job_name"
:
job_name
,
"project"
:
gcp_project
,
"save_main_session"
:
True
,
"region"
:
region
,
"setup_file"
:
"./setup.py"
,
}
pipeline_options
=
beam
.
pipeline
.
PipelineOptions
(
flags
=
[],
**
options
)
elif
args
.
runner
==
"DirectRunner"
:
pipeline_options
=
beam
.
options
.
pipeline_options
.
DirectOptions
(
direct_num_workers
=
os
.
cpu_count
(),
direct_running_mode
=
"multi_threading"
)
with
beam
.
Pipeline
(
args
.
runner
,
options
=
pipeline_options
)
as
pipeline
:
with
tft_beam
.
Context
(
temp_dir
=
args
.
temp_dir
):
processed_lines
=
(
pipeline
# Read in TSV data.
|
beam
.
io
.
ReadFromText
(
data_path
,
coder
=
beam
.
coders
.
StrUtf8Coder
())
# Fill in missing elements with the defaults (zeros).
|
"FillMissing"
>>
beam
.
ParDo
(
FillMissing
())
# For numerical features, set negatives to zero. Then take log(x+1).
|
"NegsToZeroLog"
>>
beam
.
ParDo
(
NegsToZeroLog
())
# For categorical features, mod the values with vocab size.
|
"HexToIntModRange"
>>
beam
.
ParDo
(
HexToIntModRange
()))
# CSV reader: List the cols in order, as dataset schema is not ordered.
ordered_columns
=
[
LABEL_KEY
]
+
NUMERIC_FEATURE_KEYS
+
CATEGORICAL_FEATURE_KEYS
csv_tfxio
=
tfxio
.
BeamRecordCsvTFXIO
(
physical_format
=
"text"
,
column_names
=
ordered_columns
,
delimiter
=
args
.
csv_delimeter
,
schema
=
INPUT_METADATA
.
schema
)
converted_data
=
(
processed_lines
|
"DecodeData"
>>
csv_tfxio
.
BeamSource
())
raw_dataset
=
(
converted_data
,
csv_tfxio
.
TensorAdapterConfig
())
# The TFXIO output format is chosen for improved performance.
transformed_dataset
,
_
=
(
raw_dataset
|
tft_beam
.
AnalyzeAndTransformDataset
(
preprocessing_fn
,
output_record_batches
=
False
))
# Transformed metadata is not necessary for encoding.
transformed_data
,
transformed_metadata
=
transformed_dataset
if
not
args
.
vocab_gen_mode
:
# Write to CSV.
transformed_csv_coder
=
tft
.
coders
.
CsvCoder
(
ordered_columns
,
transformed_metadata
.
schema
,
delimiter
=
args
.
csv_delimeter
)
_
=
(
transformed_data
|
"EncodeDataCsv"
>>
beam
.
Map
(
transformed_csv_coder
.
encode
)
|
"WriteDataCsv"
>>
beam
.
io
.
WriteToText
(
output_path
))
if
__name__
==
"__main__"
:
logging
.
set_verbosity
(
logging
.
INFO
)
transform_data
(
data_path
=
args
.
input_path
,
output_path
=
args
.
output_path
)
official/recommendation/ranking/preprocessing/setup.py
0 → 100644
View file @
5b5073d2
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Setup configuration for criteo dataset preprocessing.
This is used while running Tensorflow transform on Cloud Dataflow.
"""
import
setuptools
version
=
"0.1.0"
if
__name__
==
"__main__"
:
setuptools
.
setup
(
name
=
"criteo_preprocessing"
,
version
=
version
,
install_requires
=
[
"tensorflow-transform"
],
packages
=
setuptools
.
find_packages
(),
)
official/recommendation/ranking/preprocessing/shard_rebalancer.py
0 → 100644
View file @
5b5073d2
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Rebalance a set of CSV/TFRecord shards to a target number of files.
"""
import
argparse
import
datetime
import
os
import
apache_beam
as
beam
import
tensorflow
as
tf
parser
=
argparse
.
ArgumentParser
()
parser
.
add_argument
(
"--input_path"
,
default
=
None
,
required
=
True
,
help
=
"Input path."
)
parser
.
add_argument
(
"--output_path"
,
default
=
None
,
required
=
True
,
help
=
"Output path."
)
parser
.
add_argument
(
"--num_output_files"
,
type
=
int
,
default
=
256
,
help
=
"Number of output file shards."
)
parser
.
add_argument
(
"--filetype"
,
default
=
"tfrecord"
,
help
=
"File type, needs to be one of {tfrecord, csv}."
)
parser
.
add_argument
(
"--project"
,
default
=
None
,
help
=
"ID (not name) of your project. Ignored by DirectRunner"
)
parser
.
add_argument
(
"--runner"
,
help
=
"Runner for Apache Beam, needs to be one of "
"{DirectRunner, DataflowRunner}."
,
default
=
"DirectRunner"
)
parser
.
add_argument
(
"--region"
,
default
=
None
,
help
=
"region"
)
args
=
parser
.
parse_args
()
def
rebalance_data_shards
():
"""Rebalances data shards."""
def
csv_pipeline
(
pipeline
:
beam
.
Pipeline
):
"""Rebalances CSV dataset.
Args:
pipeline: Beam pipeline object.
"""
_
=
(
pipeline
|
beam
.
io
.
ReadFromText
(
args
.
input_path
)
|
beam
.
io
.
WriteToText
(
args
.
output_path
,
num_shards
=
args
.
num_output_files
))
def
tfrecord_pipeline
(
pipeline
:
beam
.
Pipeline
):
"""Rebalances TFRecords dataset.
Args:
pipeline: Beam pipeline object.
"""
example_coder
=
beam
.
coders
.
ProtoCoder
(
tf
.
train
.
Example
)
_
=
(
pipeline
|
beam
.
io
.
ReadFromTFRecord
(
args
.
input_path
,
coder
=
example_coder
)
|
beam
.
io
.
WriteToTFRecord
(
args
.
output_path
,
file_name_suffix
=
"tfrecord"
,
coder
=
example_coder
,
num_shards
=
args
.
num_output_files
))
job_name
=
(
f
"shard-rebalancer-
{
datetime
.
datetime
.
now
().
strftime
(
'%y%m%d-%H%M%S'
)
}
"
)
# set up Beam pipeline.
options
=
{
"staging_location"
:
os
.
path
.
join
(
args
.
output_path
,
"tmp"
,
"staging"
),
"temp_location"
:
os
.
path
.
join
(
args
.
output_path
,
"tmp"
),
"job_name"
:
job_name
,
"project"
:
args
.
project
,
"save_main_session"
:
True
,
"region"
:
args
.
region
,
}
opts
=
beam
.
pipeline
.
PipelineOptions
(
flags
=
[],
**
options
)
with
beam
.
Pipeline
(
args
.
runner
,
options
=
opts
)
as
pipeline
:
if
args
.
filetype
==
"tfrecord"
:
tfrecord_pipeline
(
pipeline
)
elif
args
.
filetype
==
"csv"
:
csv_pipeline
(
pipeline
)
if
__name__
==
"__main__"
:
rebalance_data_shards
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment