Source code for spark_rma.quantile_normalization

#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2018 Eli Lilly and Company
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Quantile normalize background corrected array samples.
"""

from __future__ import print_function, unicode_literals
import argparse
import logging
import sys

from pyspark.sql import SparkSession, Window
from pyspark.conf import SparkConf
# pylint: disable=no-name-in-module
# row_number is not being found in path but are legitimate
from pyspark.sql.functions import row_number
# pylint: enable=no-name-in-module


[docs]def quantile_normalize(data_frame, target): """ Quantile normalize the data using spark window spec. This allows the ranking, average, and reassigning values with a join. :param data_frame: spark data frame :param target: summary target defined by annotation. This is the level to which we will summarize, either probeset or transcript_cluster. :return: quantile normalized dataframe with sample, probe, target, and normalized value. """ # create window spec window = Window.partitionBy('SAMPLE').orderBy('INTENSITY_VALUE') # rank probes within each partition by value using 'first' ranking ranked = data_frame.select( data_frame['SAMPLE'], data_frame['PROBE'], data_frame[target], data_frame['INTENSITY_VALUE'], row_number().over(window).alias("row_number") ) # get average by row number across partitions to assign ranks to # intensity values averaged = ranked.groupBy("row_number").avg("INTENSITY_VALUE") # rename so no conflict during join averaged_x = averaged.withColumnRenamed("row_number", "row_number_1") averaged_x = averaged_x.withColumnRenamed("avg(INTENSITY_VALUE)", "NORMALIZED_INTENSITY_VALUE") # substitute normalized value by rank to original data q_norm = ranked.join(averaged_x, ranked.row_number == averaged_x.row_number_1, 'left_outer') # drop the extra columns we no longer need after joining q_norm = q_norm.drop('row_number_1', 'INTENSITY_VALUE', 'row_number') return q_norm
[docs]def normalize(spark, input_path, output): """ Read parquet file, normalize, and write results. :param spark: spark session object :param input_path: path to input parquet file with background corrected data :param output: path to write results """ data_frame = spark.read.parquet(input_path) target = infer_target_level(data_frame) data_frame = quantile_normalize(data_frame, target) data_frame.write.parquet(output)
[docs]def infer_target_level(data_frame): """ Read the input file to infer target type and select appropriate summarization class. """ def inference(headers): """Determine target level based on input headers.""" if 'TRANSCRIPT_CLUSTER' in headers: return 'TRANSCRIPT_CLUSTER' elif 'PROBESET' in headers: return 'PROBESET' else: raise KeyError('Bad Input, no valid headers found.') headers = [_.upper() for _ in data_frame.columns] target = inference(headers) return target
[docs]def command_line(): """Collect and validate command line arguments.""" class MyParser(argparse.ArgumentParser): """ Override default behavior, print the whole help message for any CLI error. """ def error(self, message): print('error: {}\n'.format(message), file=sys.stderr) self.print_help() sys.exit(2) parser = MyParser(description=""" Quantile Normalization of background corrected CEL files. The summarization target level is chosen in the previous step of Background Correction and inferred here. Input is a parquet file or directory of parquet files with background corrected samples. The required input headers: - SAMPLE - PROBE - PROBESET or TRANSCRIPT_CLUSTER - INTENSITY_VALUE The output of this step is a parquet file of quantile normalized samples with the headers: - SAMPLE - PROBE - PROBESET or TRANSCRIPT_CLUSTER - NORMALIZED_INTENSITY_VALUE """, formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument('-v', '--verbose', help="Enable Verbose logging", action="store_const", dest="loglevel", const=logging.DEBUG, default=logging.INFO) parser.add_argument('-i', '--input', help="Input path to directory of background corrected " "CEL files in parquet format.", required=True) parser.add_argument('-o', '--output', help="Output filename", default='quantile_normalized.parquet') arguments = parser.parse_args() return arguments
[docs]def main(): """ Gather command-line arguments and create spark session if executed with spark-submit """ args = command_line() logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(name)-12s ' '%(levelname)-8s %(' 'message)s') logging.info("Starting Spark...") spark_session = SparkSession.builder \ .config(conf=SparkConf() .setAppName("Quantile Normalization")).getOrCreate() normalize(spark_session, args.input, args.output) logging.info("Complete") spark_session.stop()
if __name__ == "__main__": main()