#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2018 Eli Lilly and Company
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Converts files into snappy-parquet without using Spark and JVM. This is so they
are compressed before putting into HDFS and/or before Spark reads them to
accelerate all these tasks.
"""
from __future__ import print_function, unicode_literals
import os
import sys
import logging
import argparse
from io import open
from multiprocessing import Pool
import pandas
import fastparquet # also need python-snappy installed.
[docs]class Conversion:
"""
Convert file to parquet.
"""
def __init__(self, input_name, output_name, delimiter):
self.input_name = input_name
self.output_name = output_name
self.delimiter = delimiter
[docs] def read(self):
"""
Read the files into Pandas dataframe, required by fastparquet. Reads
based on delimiter, faster to explicitly switch between these types
than to detect since it uses the c engine instead of python to read.
:return: pandas.dataframe()
"""
with open(self.input_name, 'r', encoding='utf-8') as file_handle:
if self.delimiter == 'tab':
data_frame = pandas.read_table(file_handle, index_col=False)
elif self.delimiter == 'comma':
data_frame = pandas.read_csv(file_handle, index_col=False)
return data_frame
[docs] def write(self, dataframe):
"""
Write a pandas dataframe into parquet format using fastparquet and
snappy compression.
:param dataframe: a pandas dataframe as input
"""
fastparquet.write(self.output_name, dataframe, compression='SNAPPY')
logging.info("Parquet format at: %s", self.output_name)
[docs] def check(self):
"""
Check if the output file has been written before already. During
recovery or re-run, don't waste time by re-writing the same
files.
:return: bool if path exists
"""
if os.path.isfile(self.output_name):
return True
[docs] def execute(self):
"""
Check the file's existence and convert into parquet if the output
does not already exist. Public method to call on the class.
"""
if self.check():
logging.error("File %s already exists!", self.output_name)
else:
logging.info("Converting %s to parquet.", self.input_name)
data = self.read()
self.write(data)
[docs]def call_conversion(in_name, out_name, mode):
"""
Create object and call method to execute conversion. This is required so
that multiprocessing can pickle a function, it cannot pickle a class.
:param in_name: input file name in flat file, tsv or csv
:param out_name: output file name in parquet
:param mode: delimiter, comma or tab.
"""
multiple = Conversion(in_name, out_name, mode)
multiple.execute()
[docs]def parallelizer(input_directory, output_directory, mode):
"""
Parallelizes this program to convert a whole directory of files.
:param input_directory: input directory of flat files
:param output_directory: name of directory to write to
:param mode: delimiter in files, currently must be the same for whole
directory
"""
pool = Pool()
job_tracker = {}
for each in os.listdir(os.path.abspath(input_directory)):
in_file = os.path.abspath(os.path.join(input_directory, each))
out_name = os.path.join(os.path.abspath(output_directory),
os.path.basename(each) + '.parquet')
job_tracker[out_name] = pool.apply_async(
call_conversion, args=(in_file, out_name, mode))
pool.close()
pool.join()
[docs]def makedir_if_not_exist(directory):
"""
Create a directory if it does not already exist.
:param directory: (str) Directory name.
"""
if not os.path.exists(directory):
os.makedirs(directory)
[docs]def command_line():
"""Collect and validate command line arguments."""
class MyParser(argparse.ArgumentParser):
"""
Override default behavior, print the whole help message for any CLI
error.
"""
def error(self, message):
print('error: {}\n'.format(message), file=sys.stderr)
self.print_help()
sys.exit(2)
parser = MyParser(description="Convert to Parquet")
parser.add_argument(dest="input", help="Input filename",
type=str or unicode, nargs='?')
parser.add_argument(dest="output", help="Output filename",
type=str or unicode, nargs='?')
parser.add_argument('-id', '--input_directory', type=str or unicode,
help="Input directory of files to convert to parquet "
"in parallel. This overrides the positional "
"arguments for a single file.")
parser.add_argument('-od', '--output_directory', type=str or unicode,
help="Specify output directory for the parquet files,"
" otherwise they are written to pwd.", default='.')
file_format = parser.add_mutually_exclusive_group()
file_format.add_argument("-tab", action="store_const", dest="file_type",
const="tab",
help="Set this argument if you are supplying a "
"tab-separated file. This is the default.")
file_format.add_argument("-comma", action="store_const", dest="file_type",
const="comma",
help="set this argument if you are supplying a "
"comma-separated file.")
parser.set_defaults(file_type='tab')
arguments = parser.parse_args()
if not arguments.input_directory:
if arguments.input or arguments.output:
if not arguments.input or not arguments.output:
parser.error("You must specify both input file and output "
"filename")
if not arguments.input and not arguments.output:
parser.error(
"You must use positional arguments to provide input and "
"output filename or specify an input directory with "
"-id/--input_directory")
return arguments
if __name__ == "__main__":
args = command_line()
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(name)-12s '
'%(levelname)-8s %('
'message)s')
if args.input and args.output:
call_conversion(args.input, args.output, args.file_type)
if args.input_directory:
makedir_if_not_exist(os.path.abspath(args.output_directory))
parallelizer(args.input_directory, args.output_directory,
args.file_type)