setu.utilities
1import argparse 2import os 3import shutil 4from pyspark.sql import DataFrame 5from pyspark.sql.functions import ( 6 split, 7 count, 8 sum, 9 avg, 10 udf, 11 min, 12 max, 13 when, 14 col, 15 create_map, 16 collect_list, 17 expr, 18 map_from_arrays, 19 posexplode, 20 struct, 21) 22 23from pyspark.sql.types import ( 24 StringType, 25) 26 27 28 29def str2bool(v:str) -> bool: 30 """str2bool Returns the boolean equivalent given various string representations of the True/False values. 31 32 Args: 33 v (str): A string that might represent a boolean value. 34 35 Raises: 36 argparse.ArgumentTypeError: Error that mentions the provided value does not represent a boolean value. 37 38 Returns: 39 bool : Returns the bool equivalent of the provided value. 40 """ 41 if v.lower() in ('yes', 'true', 't', 'y', '1'): 42 return True 43 elif v.lower() in ('no', 'false', 'f', 'n', '0'): 44 return False 45 else: 46 raise argparse.ArgumentTypeError('Boolean value expected.') 47 48def list_of_strings(arg:str) -> list: 49 """list_of_strings Generate the list of strings provided a single string. 50 51 Args: 52 arg (str): The string argument which needs to be split. 53 54 Returns: 55 list: List of strings of the string split using ',' delimitter. 56 """ 57 return arg.split(',') 58 59def rename_partitioned_directories(base_dir:str, partition_column_name:str): 60 """rename_partitioned_directories Function that renames the partitioned directiories. 61 62 Args: 63 base_dir (str): Base directory path 64 partition_column_name (str): Column name based on which the partitions were produced. 65 """ 66 for dir_name in os.listdir(base_dir): 67 if dir_name.startswith(partition_column_name + "="): 68 new_name = dir_name.split("=")[1] 69 old_path = os.path.join(base_dir, dir_name) 70 new_path = os.path.join(base_dir, new_name) 71 shutil.move(old_path, new_path) 72 73class ChunkHandler(): 74 """ChunkHandler The Class representation for the a handler object that provides utilities that manipulates various chunks of text data. 75 """ 76 def doc2lines( 77 self, 78 df:DataFrame, 79 text_column:str, 80 split_symbol:str 81 ) -> DataFrame: 82 """doc2lines Given a dataframe, Splits the various documents into multiple lines. 83 84 Args: 85 df (DataFrame): The dataframe object input. 86 text_column (str): The column name for the text in the dataframe. 87 split_symbol (str): The symbol on which splits need to be done. 88 89 Returns: 90 DataFrame: _description_ 91 """ 92 df = df \ 93 .withColumn(text_column, split(text_column, split_symbol, -1)) \ 94 .select("*", posexplode(text_column)).drop(text_column).withColumnRenamed("col", text_column) \ 95 96 return df 97 98 def lines2doc( 99 self, 100 df:DataFrame, 101 text_column:str, 102 identifier_column:str, 103 sort_column:str 104 )->DataFrame: 105 """lines2doc Given a dataframe, Merges the various lines into documents. 106 107 Args: 108 df (DataFrame): The dataframe object input. 109 text_column (str): The column name for the text in the dataframe. 110 identifier_column (str): The column based on which the lines need to be grouped into documents. 111 sort_column (str): The column based on which the final dataframe needs to be sorted. 112 113 Returns: 114 DataFrame: _description_ 115 """ 116 def join_using_symbol(x): 117 lines = [] 118 for line in x: 119 if line: 120 lines += [line[text_column]] 121 122 text = "" 123 for line in lines: 124 if len(line) >= 2 and line[0] == " " and line[1] == " ": 125 text += line[1:] 126 else: 127 text += line 128 return text 129 130 join_lines = udf(join_using_symbol, StringType()) 131 132 df = df.withColumn(text_column, struct([sort_column, text_column])).select(identifier_column, text_column) \ 133 .groupBy(identifier_column) \ 134 .agg(collect_list(text_column).alias(text_column)) \ 135 .withColumn( 136 text_column, 137 expr( 138 f"array_sort(transform({text_column},x->struct(x['{sort_column}'] as {sort_column},x['{text_column}'] as {text_column})))" 139 ) 140 ) \ 141 .withColumn(text_column, join_lines(text_column)) 142 143 return df 144 145class SparkOptimizedHandlers(): 146 """SparkOptimizedHandlers The Class representation for the a handler object that provides utilities that manipulates the dataframe and provides with various statistics. 147 """ 148 def get_num_lines(self, grouped_line_df:DataFrame)->int: 149 """get_num_lines Method that returns the number of lines present in the dataframe. 150 151 Args: 152 grouped_line_df (DataFrame): Dataframe object containing the grouped lines. 153 154 Returns: 155 int: Value representing the number of lines. 156 """ 157 lines_count = grouped_line_df.agg(count("*").alias("lines_count")) 158 return lines_count 159 160 def get_mean_line_length(self, grouped_line_df:DataFrame, line_len_col_:str)->int: 161 """get_mean_line_length Method that returns the mean line length of all the lines present in the dataframe. 162 163 Args: 164 grouped_line_df (DataFrame): Dataframe object containing the grouped lines. 165 line_len_col_ (str): Column that represents the line length in a document. 166 167 Returns: 168 int: Value representing the mean line length. 169 """ 170 mean_line_lengths = grouped_line_df.agg(avg(line_len_col_).alias("mean_line_length")) 171 return mean_line_lengths 172 173 def get_min_line_length(self, grouped_line_df:DataFrame, line_len_col_:str)->int: 174 """get_min_line_length Method that returns the min line length of all the lines present in the dataframe. 175 176 Args: 177 grouped_line_df (DataFrame): Dataframe object containing the grouped lines. 178 line_len_col_ (str): Column that represents the line length in a document. 179 180 Returns: 181 int: Value representing the min line length. 182 """ 183 min_line_lengths_col = grouped_line_df.agg(min(line_len_col_).alias("min_line_length")) 184 return min_line_lengths_col 185 186 def get_max_line_length(self, grouped_line_df:DataFrame, line_len_col_:str)->int: 187 """get_max_line_length Method that returns the max line length of all the lines present in the dataframe. 188 189 Args: 190 grouped_line_df (DataFrame): Dataframe object containing the grouped lines. 191 line_len_col_ (str): Column name that contains the line length for the various document lines. 192 193 Returns: 194 int: Value representing the max line length. 195 """ 196 max_line_lengths = grouped_line_df.agg(max(line_len_col_).alias("max_line_length")) 197 return max_line_lengths 198 199 def get_nsfw_words_count(self, grouped_line_df:DataFrame, line_nsfw_count_col_:str)->int: 200 """get_nsfw_words_count Method that returns the number of NSFW words present in the dataframe. 201 202 Args: 203 grouped_line_df (DataFrame): Dataframe object containing the grouped lines. 204 line_nsfw_count_col_ (str): Column name that contains the nsfw word count for the various document lines. 205 206 Returns: 207 int: Value representing the total nsfw word count. 208 """ 209 nsfw_count = grouped_line_df.agg(sum(line_nsfw_count_col_).alias("nsfw_words_count")) 210 return nsfw_count 211 212 def get_non_li_words_count(self, grouped_line_df:DataFrame, line_non_li_count_col_:str)->int: 213 """get_non_li_words_count Method that returns the number of non latin-indic words in the dataframe. 214 215 Args: 216 grouped_line_df (DataFrame): Dataframe object containing the grouped lines. 217 line_non_li_count_col_ (str): Column name that contains the non-li word count for the various document lines. 218 219 Returns: 220 int: Value representing the total non-latin indic word count. 221 """ 222 non_li_count = grouped_line_df.agg(sum(line_non_li_count_col_).alias("non_li_char_count")) 223 return non_li_count 224 225 def get_bytes(self, grouped_line_df:DataFrame, line_bytes_col_:str)->int: 226 """get_bytes Method that returns the total bytes that represent the data present in the dataframe. 227 228 Args: 229 grouped_line_df (DataFrame): Dataframe object containing the grouped lines. 230 line_bytes_col_ (str): Column name that contains the total bytes for the various document lines. 231 232 Returns: 233 int: Value representing the total bytes of data present in the dataframe. 234 """ 235 bytes_ = grouped_line_df.agg(sum(line_bytes_col_).alias("bytes")) 236 return bytes_ 237 238 def get_words_count(self, grouped_line_df:DataFrame, line_words_count_col_:str)->int: 239 """get_words_count Method that returns the total word count present in the dataframe. 240 241 Args: 242 grouped_line_df (DataFrame): Dataframe object containing the grouped lines. 243 line_words_count_col_ (str): Column name that contains the word count of the various document lines. 244 245 Returns: 246 int: Value representing the total word count in the dataframe. 247 """ 248 word_count = grouped_line_df.agg(sum(line_words_count_col_).alias("words_count")) 249 return word_count 250 251 def get_char_count(self, grouped_line_df:DataFrame, line_char_count_col_:str)->int: 252 """get_char_count Method that returns the total char count present in the dataframe. 253 254 Args: 255 grouped_line_df (DataFrame): Dataframe object containing the grouped lines. 256 line_char_count_col_ (str): Column name that contains the char count of the various document lines. 257 258 Returns: 259 int: Value representing the total character count in the dataframe. 260 """ 261 char_count = grouped_line_df.agg(sum(line_char_count_col_).alias("char_count")) 262 return char_count 263 264 def get_repeated_line_dist(self, line_df:DataFrame, id_col:str, text_col:str)->int: 265 """get_repeated_line_dist Method that returns the distance between the closest repeated lines. 266 267 Args: 268 line_df (DataFrame): Dataframe object containing the lines. 269 id_col (str): The column based on which the dataframe needs to be grouped by. 270 text_col (str): The column name for the text in the dataframe. 271 272 Returns: 273 int: Returns the distance between repeated lines 274 """ 275 col_name = "repeated_line_dist" 276 277 repeated_line_dist = line_df.groupBy(id_col, text_col) \ 278 .agg(count("*").alias(col_name)) \ 279 .groupBy(id_col) \ 280 .agg(collect_list(create_map([text_col, col_name])).alias(col_name)) \ 281 .withColumn("keys", expr(f"transform({col_name}, x -> map_keys(x)[0])")) \ 282 .withColumn("values", expr(f"transform({col_name}, x -> map_values(x)[0])")) \ 283 .withColumn(col_name, map_from_arrays(col("keys"), col("values"))) \ 284 .drop("keys", "values") 285 286 return repeated_line_dist 287 288 def run_analysis( 289 self, 290 line_df:DataFrame, 291 doc_id_col:str, 292 line_nsfw_count_col_:str, 293 line_non_li_count_col_:str, 294 line_bytes_col_:str, 295 line_words_count_col_:str, 296 line_char_count_col_:str, 297 only_base_stats:bool=False, 298 ) -> DataFrame: 299 """run_analysis Method that runs the analysis and aggregates the various stats for the dataframe. 300 301 Args: 302 line_df (DataFrame): Dataframe object containing the lines. 303 doc_id_col (str): The column based on which the dataframe needs to be grouped by. 304 line_nsfw_count_col_ (str): Column name that contains the nsfw word count of the various document lines. 305 line_non_li_count_col_ (str): Column name that contains the non latin-indic word count of the various document lines. 306 line_bytes_col_ (str): Column name that contains the byte count of the various document lines. 307 line_words_count_col_ (str): Column name that contains the word count of the various document lines. 308 line_char_count_col_ (str): Column name that contains the character count of the various document lines. 309 only_base_stats (bool, optional): If only return the basic statistic values. Defaults to False. 310 311 Returns: 312 DataFrame: Returns the dataframe with computed statistic values. 313 """ 314 grouped_line_df = line_df.groupBy(doc_id_col) 315 bytes_df = self.get_bytes(grouped_line_df, line_bytes_col_) 316 words_count_df = self.get_words_count(grouped_line_df, line_words_count_col_) 317 char_count_df = self.get_char_count(grouped_line_df, line_char_count_col_) 318 319 doc_df = bytes_df \ 320 .join(words_count_df, [doc_id_col]) \ 321 .join(char_count_df, [doc_id_col]) 322 323 if not only_base_stats: 324 num_lines_df = self.get_num_lines(grouped_line_df) 325 nsfw_words_count_df = self.get_nsfw_words_count(grouped_line_df, line_nsfw_count_col_) 326 non_li_words_count_df = self.get_non_li_words_count(grouped_line_df, line_non_li_count_col_) 327 mean_line_len_df = self.get_mean_line_length(grouped_line_df, "words_count") 328 min_line_len_df = self.get_min_line_length(grouped_line_df, "words_count") 329 max_line_len_df = self.get_max_line_length(grouped_line_df, "words_count") 330 331 doc_df = doc_df \ 332 .join(num_lines_df, [doc_id_col]) \ 333 .join(mean_line_len_df, [doc_id_col]) \ 334 .join(min_line_len_df, [doc_id_col]) \ 335 .join(max_line_len_df, [doc_id_col]) \ 336 .join(nsfw_words_count_df, [doc_id_col]) \ 337 .join(non_li_words_count_df, [doc_id_col]) 338 339 return doc_df 340 341 def run_flagging( 342 self, 343 doc_df:DataFrame, 344 word_count_col:str, 345 char_count_col:str, 346 nsfw_count_col:str, 347 nsfw_threshold:float, 348 non_li_count_col:str, 349 non_li_threshold:float, 350 min_line_count:int, 351 line_count_col:str, 352 min_mean_line_len:int, 353 mean_line_len_col:str, 354 )->DataFrame: 355 """run_flagging Method that executes the flagging stage based on computed document statistics. 356 357 Args: 358 doc_df (DataFrame): The dataframe object containing the various documents. 359 word_count_col (str): Column name that contains the word count of the various document lines. 360 char_count_col (str): Column name that contains the character word count of the various document lines. 361 nsfw_count_col (str): Column name that contains the nsfw word count of the various document lines. 362 nsfw_threshold (float): Threshold value for number of NSFW words acceptable. 363 non_li_count_col (str): Column name that contains the non latin-indic word count of the various document lines. 364 non_li_threshold (float): Threshold value for number of non latin-indic words. 365 min_line_count (int): Threshold value for minimum number of lines to constitute a document. 366 line_count_col (str): Column name that contains the line count of the various documents. 367 min_mean_line_len (int): Threshold value for the mean line length. 368 mean_line_len_col (str): Column name that contains the mean line length of the various document lines. 369 370 Returns: 371 DataFrame: _description_ 372 """ 373 doc_df = doc_df \ 374 .select("*", when(doc_df[line_count_col] <= min_line_count, True).otherwise(False).alias("has_less_lines")) \ 375 .select("*", when(doc_df[mean_line_len_col] <= min_mean_line_len, True).otherwise(False).alias("is_short_lines_heavy")) \ 376 .select("*", when(doc_df[nsfw_count_col]/doc_df[word_count_col] >= nsfw_threshold, True).otherwise(False).alias("is_nsfw_heavy")) \ 377 .select("*", when(doc_df[non_li_count_col]/doc_df[char_count_col] >= non_li_threshold, True).otherwise(False).alias("is_non_li_heavy")) 378 379 return doc_df
30def str2bool(v:str) -> bool: 31 """str2bool Returns the boolean equivalent given various string representations of the True/False values. 32 33 Args: 34 v (str): A string that might represent a boolean value. 35 36 Raises: 37 argparse.ArgumentTypeError: Error that mentions the provided value does not represent a boolean value. 38 39 Returns: 40 bool : Returns the bool equivalent of the provided value. 41 """ 42 if v.lower() in ('yes', 'true', 't', 'y', '1'): 43 return True 44 elif v.lower() in ('no', 'false', 'f', 'n', '0'): 45 return False 46 else: 47 raise argparse.ArgumentTypeError('Boolean value expected.')
str2bool Returns the boolean equivalent given various string representations of the True/False values.
Arguments:
- v (str): A string that might represent a boolean value.
Raises:
- argparse.ArgumentTypeError: Error that mentions the provided value does not represent a boolean value.
Returns:
bool : Returns the bool equivalent of the provided value.
49def list_of_strings(arg:str) -> list: 50 """list_of_strings Generate the list of strings provided a single string. 51 52 Args: 53 arg (str): The string argument which needs to be split. 54 55 Returns: 56 list: List of strings of the string split using ',' delimitter. 57 """ 58 return arg.split(',')
list_of_strings Generate the list of strings provided a single string.
Arguments:
- arg (str): The string argument which needs to be split.
Returns:
list: List of strings of the string split using ',' delimitter.
60def rename_partitioned_directories(base_dir:str, partition_column_name:str): 61 """rename_partitioned_directories Function that renames the partitioned directiories. 62 63 Args: 64 base_dir (str): Base directory path 65 partition_column_name (str): Column name based on which the partitions were produced. 66 """ 67 for dir_name in os.listdir(base_dir): 68 if dir_name.startswith(partition_column_name + "="): 69 new_name = dir_name.split("=")[1] 70 old_path = os.path.join(base_dir, dir_name) 71 new_path = os.path.join(base_dir, new_name) 72 shutil.move(old_path, new_path)
rename_partitioned_directories Function that renames the partitioned directiories.
Arguments:
- base_dir (str): Base directory path
- partition_column_name (str): Column name based on which the partitions were produced.
74class ChunkHandler(): 75 """ChunkHandler The Class representation for the a handler object that provides utilities that manipulates various chunks of text data. 76 """ 77 def doc2lines( 78 self, 79 df:DataFrame, 80 text_column:str, 81 split_symbol:str 82 ) -> DataFrame: 83 """doc2lines Given a dataframe, Splits the various documents into multiple lines. 84 85 Args: 86 df (DataFrame): The dataframe object input. 87 text_column (str): The column name for the text in the dataframe. 88 split_symbol (str): The symbol on which splits need to be done. 89 90 Returns: 91 DataFrame: _description_ 92 """ 93 df = df \ 94 .withColumn(text_column, split(text_column, split_symbol, -1)) \ 95 .select("*", posexplode(text_column)).drop(text_column).withColumnRenamed("col", text_column) \ 96 97 return df 98 99 def lines2doc( 100 self, 101 df:DataFrame, 102 text_column:str, 103 identifier_column:str, 104 sort_column:str 105 )->DataFrame: 106 """lines2doc Given a dataframe, Merges the various lines into documents. 107 108 Args: 109 df (DataFrame): The dataframe object input. 110 text_column (str): The column name for the text in the dataframe. 111 identifier_column (str): The column based on which the lines need to be grouped into documents. 112 sort_column (str): The column based on which the final dataframe needs to be sorted. 113 114 Returns: 115 DataFrame: _description_ 116 """ 117 def join_using_symbol(x): 118 lines = [] 119 for line in x: 120 if line: 121 lines += [line[text_column]] 122 123 text = "" 124 for line in lines: 125 if len(line) >= 2 and line[0] == " " and line[1] == " ": 126 text += line[1:] 127 else: 128 text += line 129 return text 130 131 join_lines = udf(join_using_symbol, StringType()) 132 133 df = df.withColumn(text_column, struct([sort_column, text_column])).select(identifier_column, text_column) \ 134 .groupBy(identifier_column) \ 135 .agg(collect_list(text_column).alias(text_column)) \ 136 .withColumn( 137 text_column, 138 expr( 139 f"array_sort(transform({text_column},x->struct(x['{sort_column}'] as {sort_column},x['{text_column}'] as {text_column})))" 140 ) 141 ) \ 142 .withColumn(text_column, join_lines(text_column)) 143 144 return df
ChunkHandler The Class representation for the a handler object that provides utilities that manipulates various chunks of text data.
77 def doc2lines( 78 self, 79 df:DataFrame, 80 text_column:str, 81 split_symbol:str 82 ) -> DataFrame: 83 """doc2lines Given a dataframe, Splits the various documents into multiple lines. 84 85 Args: 86 df (DataFrame): The dataframe object input. 87 text_column (str): The column name for the text in the dataframe. 88 split_symbol (str): The symbol on which splits need to be done. 89 90 Returns: 91 DataFrame: _description_ 92 """ 93 df = df \ 94 .withColumn(text_column, split(text_column, split_symbol, -1)) \ 95 .select("*", posexplode(text_column)).drop(text_column).withColumnRenamed("col", text_column) \ 96 97 return df
doc2lines Given a dataframe, Splits the various documents into multiple lines.
Arguments:
- df (DataFrame): The dataframe object input.
- text_column (str): The column name for the text in the dataframe.
- split_symbol (str): The symbol on which splits need to be done.
Returns:
DataFrame: _description_
99 def lines2doc( 100 self, 101 df:DataFrame, 102 text_column:str, 103 identifier_column:str, 104 sort_column:str 105 )->DataFrame: 106 """lines2doc Given a dataframe, Merges the various lines into documents. 107 108 Args: 109 df (DataFrame): The dataframe object input. 110 text_column (str): The column name for the text in the dataframe. 111 identifier_column (str): The column based on which the lines need to be grouped into documents. 112 sort_column (str): The column based on which the final dataframe needs to be sorted. 113 114 Returns: 115 DataFrame: _description_ 116 """ 117 def join_using_symbol(x): 118 lines = [] 119 for line in x: 120 if line: 121 lines += [line[text_column]] 122 123 text = "" 124 for line in lines: 125 if len(line) >= 2 and line[0] == " " and line[1] == " ": 126 text += line[1:] 127 else: 128 text += line 129 return text 130 131 join_lines = udf(join_using_symbol, StringType()) 132 133 df = df.withColumn(text_column, struct([sort_column, text_column])).select(identifier_column, text_column) \ 134 .groupBy(identifier_column) \ 135 .agg(collect_list(text_column).alias(text_column)) \ 136 .withColumn( 137 text_column, 138 expr( 139 f"array_sort(transform({text_column},x->struct(x['{sort_column}'] as {sort_column},x['{text_column}'] as {text_column})))" 140 ) 141 ) \ 142 .withColumn(text_column, join_lines(text_column)) 143 144 return df
lines2doc Given a dataframe, Merges the various lines into documents.
Arguments:
- df (DataFrame): The dataframe object input.
- text_column (str): The column name for the text in the dataframe.
- identifier_column (str): The column based on which the lines need to be grouped into documents.
- sort_column (str): The column based on which the final dataframe needs to be sorted.
Returns:
DataFrame: _description_
146class SparkOptimizedHandlers(): 147 """SparkOptimizedHandlers The Class representation for the a handler object that provides utilities that manipulates the dataframe and provides with various statistics. 148 """ 149 def get_num_lines(self, grouped_line_df:DataFrame)->int: 150 """get_num_lines Method that returns the number of lines present in the dataframe. 151 152 Args: 153 grouped_line_df (DataFrame): Dataframe object containing the grouped lines. 154 155 Returns: 156 int: Value representing the number of lines. 157 """ 158 lines_count = grouped_line_df.agg(count("*").alias("lines_count")) 159 return lines_count 160 161 def get_mean_line_length(self, grouped_line_df:DataFrame, line_len_col_:str)->int: 162 """get_mean_line_length Method that returns the mean line length of all the lines present in the dataframe. 163 164 Args: 165 grouped_line_df (DataFrame): Dataframe object containing the grouped lines. 166 line_len_col_ (str): Column that represents the line length in a document. 167 168 Returns: 169 int: Value representing the mean line length. 170 """ 171 mean_line_lengths = grouped_line_df.agg(avg(line_len_col_).alias("mean_line_length")) 172 return mean_line_lengths 173 174 def get_min_line_length(self, grouped_line_df:DataFrame, line_len_col_:str)->int: 175 """get_min_line_length Method that returns the min line length of all the lines present in the dataframe. 176 177 Args: 178 grouped_line_df (DataFrame): Dataframe object containing the grouped lines. 179 line_len_col_ (str): Column that represents the line length in a document. 180 181 Returns: 182 int: Value representing the min line length. 183 """ 184 min_line_lengths_col = grouped_line_df.agg(min(line_len_col_).alias("min_line_length")) 185 return min_line_lengths_col 186 187 def get_max_line_length(self, grouped_line_df:DataFrame, line_len_col_:str)->int: 188 """get_max_line_length Method that returns the max line length of all the lines present in the dataframe. 189 190 Args: 191 grouped_line_df (DataFrame): Dataframe object containing the grouped lines. 192 line_len_col_ (str): Column name that contains the line length for the various document lines. 193 194 Returns: 195 int: Value representing the max line length. 196 """ 197 max_line_lengths = grouped_line_df.agg(max(line_len_col_).alias("max_line_length")) 198 return max_line_lengths 199 200 def get_nsfw_words_count(self, grouped_line_df:DataFrame, line_nsfw_count_col_:str)->int: 201 """get_nsfw_words_count Method that returns the number of NSFW words present in the dataframe. 202 203 Args: 204 grouped_line_df (DataFrame): Dataframe object containing the grouped lines. 205 line_nsfw_count_col_ (str): Column name that contains the nsfw word count for the various document lines. 206 207 Returns: 208 int: Value representing the total nsfw word count. 209 """ 210 nsfw_count = grouped_line_df.agg(sum(line_nsfw_count_col_).alias("nsfw_words_count")) 211 return nsfw_count 212 213 def get_non_li_words_count(self, grouped_line_df:DataFrame, line_non_li_count_col_:str)->int: 214 """get_non_li_words_count Method that returns the number of non latin-indic words in the dataframe. 215 216 Args: 217 grouped_line_df (DataFrame): Dataframe object containing the grouped lines. 218 line_non_li_count_col_ (str): Column name that contains the non-li word count for the various document lines. 219 220 Returns: 221 int: Value representing the total non-latin indic word count. 222 """ 223 non_li_count = grouped_line_df.agg(sum(line_non_li_count_col_).alias("non_li_char_count")) 224 return non_li_count 225 226 def get_bytes(self, grouped_line_df:DataFrame, line_bytes_col_:str)->int: 227 """get_bytes Method that returns the total bytes that represent the data present in the dataframe. 228 229 Args: 230 grouped_line_df (DataFrame): Dataframe object containing the grouped lines. 231 line_bytes_col_ (str): Column name that contains the total bytes for the various document lines. 232 233 Returns: 234 int: Value representing the total bytes of data present in the dataframe. 235 """ 236 bytes_ = grouped_line_df.agg(sum(line_bytes_col_).alias("bytes")) 237 return bytes_ 238 239 def get_words_count(self, grouped_line_df:DataFrame, line_words_count_col_:str)->int: 240 """get_words_count Method that returns the total word count present in the dataframe. 241 242 Args: 243 grouped_line_df (DataFrame): Dataframe object containing the grouped lines. 244 line_words_count_col_ (str): Column name that contains the word count of the various document lines. 245 246 Returns: 247 int: Value representing the total word count in the dataframe. 248 """ 249 word_count = grouped_line_df.agg(sum(line_words_count_col_).alias("words_count")) 250 return word_count 251 252 def get_char_count(self, grouped_line_df:DataFrame, line_char_count_col_:str)->int: 253 """get_char_count Method that returns the total char count present in the dataframe. 254 255 Args: 256 grouped_line_df (DataFrame): Dataframe object containing the grouped lines. 257 line_char_count_col_ (str): Column name that contains the char count of the various document lines. 258 259 Returns: 260 int: Value representing the total character count in the dataframe. 261 """ 262 char_count = grouped_line_df.agg(sum(line_char_count_col_).alias("char_count")) 263 return char_count 264 265 def get_repeated_line_dist(self, line_df:DataFrame, id_col:str, text_col:str)->int: 266 """get_repeated_line_dist Method that returns the distance between the closest repeated lines. 267 268 Args: 269 line_df (DataFrame): Dataframe object containing the lines. 270 id_col (str): The column based on which the dataframe needs to be grouped by. 271 text_col (str): The column name for the text in the dataframe. 272 273 Returns: 274 int: Returns the distance between repeated lines 275 """ 276 col_name = "repeated_line_dist" 277 278 repeated_line_dist = line_df.groupBy(id_col, text_col) \ 279 .agg(count("*").alias(col_name)) \ 280 .groupBy(id_col) \ 281 .agg(collect_list(create_map([text_col, col_name])).alias(col_name)) \ 282 .withColumn("keys", expr(f"transform({col_name}, x -> map_keys(x)[0])")) \ 283 .withColumn("values", expr(f"transform({col_name}, x -> map_values(x)[0])")) \ 284 .withColumn(col_name, map_from_arrays(col("keys"), col("values"))) \ 285 .drop("keys", "values") 286 287 return repeated_line_dist 288 289 def run_analysis( 290 self, 291 line_df:DataFrame, 292 doc_id_col:str, 293 line_nsfw_count_col_:str, 294 line_non_li_count_col_:str, 295 line_bytes_col_:str, 296 line_words_count_col_:str, 297 line_char_count_col_:str, 298 only_base_stats:bool=False, 299 ) -> DataFrame: 300 """run_analysis Method that runs the analysis and aggregates the various stats for the dataframe. 301 302 Args: 303 line_df (DataFrame): Dataframe object containing the lines. 304 doc_id_col (str): The column based on which the dataframe needs to be grouped by. 305 line_nsfw_count_col_ (str): Column name that contains the nsfw word count of the various document lines. 306 line_non_li_count_col_ (str): Column name that contains the non latin-indic word count of the various document lines. 307 line_bytes_col_ (str): Column name that contains the byte count of the various document lines. 308 line_words_count_col_ (str): Column name that contains the word count of the various document lines. 309 line_char_count_col_ (str): Column name that contains the character count of the various document lines. 310 only_base_stats (bool, optional): If only return the basic statistic values. Defaults to False. 311 312 Returns: 313 DataFrame: Returns the dataframe with computed statistic values. 314 """ 315 grouped_line_df = line_df.groupBy(doc_id_col) 316 bytes_df = self.get_bytes(grouped_line_df, line_bytes_col_) 317 words_count_df = self.get_words_count(grouped_line_df, line_words_count_col_) 318 char_count_df = self.get_char_count(grouped_line_df, line_char_count_col_) 319 320 doc_df = bytes_df \ 321 .join(words_count_df, [doc_id_col]) \ 322 .join(char_count_df, [doc_id_col]) 323 324 if not only_base_stats: 325 num_lines_df = self.get_num_lines(grouped_line_df) 326 nsfw_words_count_df = self.get_nsfw_words_count(grouped_line_df, line_nsfw_count_col_) 327 non_li_words_count_df = self.get_non_li_words_count(grouped_line_df, line_non_li_count_col_) 328 mean_line_len_df = self.get_mean_line_length(grouped_line_df, "words_count") 329 min_line_len_df = self.get_min_line_length(grouped_line_df, "words_count") 330 max_line_len_df = self.get_max_line_length(grouped_line_df, "words_count") 331 332 doc_df = doc_df \ 333 .join(num_lines_df, [doc_id_col]) \ 334 .join(mean_line_len_df, [doc_id_col]) \ 335 .join(min_line_len_df, [doc_id_col]) \ 336 .join(max_line_len_df, [doc_id_col]) \ 337 .join(nsfw_words_count_df, [doc_id_col]) \ 338 .join(non_li_words_count_df, [doc_id_col]) 339 340 return doc_df 341 342 def run_flagging( 343 self, 344 doc_df:DataFrame, 345 word_count_col:str, 346 char_count_col:str, 347 nsfw_count_col:str, 348 nsfw_threshold:float, 349 non_li_count_col:str, 350 non_li_threshold:float, 351 min_line_count:int, 352 line_count_col:str, 353 min_mean_line_len:int, 354 mean_line_len_col:str, 355 )->DataFrame: 356 """run_flagging Method that executes the flagging stage based on computed document statistics. 357 358 Args: 359 doc_df (DataFrame): The dataframe object containing the various documents. 360 word_count_col (str): Column name that contains the word count of the various document lines. 361 char_count_col (str): Column name that contains the character word count of the various document lines. 362 nsfw_count_col (str): Column name that contains the nsfw word count of the various document lines. 363 nsfw_threshold (float): Threshold value for number of NSFW words acceptable. 364 non_li_count_col (str): Column name that contains the non latin-indic word count of the various document lines. 365 non_li_threshold (float): Threshold value for number of non latin-indic words. 366 min_line_count (int): Threshold value for minimum number of lines to constitute a document. 367 line_count_col (str): Column name that contains the line count of the various documents. 368 min_mean_line_len (int): Threshold value for the mean line length. 369 mean_line_len_col (str): Column name that contains the mean line length of the various document lines. 370 371 Returns: 372 DataFrame: _description_ 373 """ 374 doc_df = doc_df \ 375 .select("*", when(doc_df[line_count_col] <= min_line_count, True).otherwise(False).alias("has_less_lines")) \ 376 .select("*", when(doc_df[mean_line_len_col] <= min_mean_line_len, True).otherwise(False).alias("is_short_lines_heavy")) \ 377 .select("*", when(doc_df[nsfw_count_col]/doc_df[word_count_col] >= nsfw_threshold, True).otherwise(False).alias("is_nsfw_heavy")) \ 378 .select("*", when(doc_df[non_li_count_col]/doc_df[char_count_col] >= non_li_threshold, True).otherwise(False).alias("is_non_li_heavy")) 379 380 return doc_df
SparkOptimizedHandlers The Class representation for the a handler object that provides utilities that manipulates the dataframe and provides with various statistics.
149 def get_num_lines(self, grouped_line_df:DataFrame)->int: 150 """get_num_lines Method that returns the number of lines present in the dataframe. 151 152 Args: 153 grouped_line_df (DataFrame): Dataframe object containing the grouped lines. 154 155 Returns: 156 int: Value representing the number of lines. 157 """ 158 lines_count = grouped_line_df.agg(count("*").alias("lines_count")) 159 return lines_count
get_num_lines Method that returns the number of lines present in the dataframe.
Arguments:
- grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
Returns:
int: Value representing the number of lines.
161 def get_mean_line_length(self, grouped_line_df:DataFrame, line_len_col_:str)->int: 162 """get_mean_line_length Method that returns the mean line length of all the lines present in the dataframe. 163 164 Args: 165 grouped_line_df (DataFrame): Dataframe object containing the grouped lines. 166 line_len_col_ (str): Column that represents the line length in a document. 167 168 Returns: 169 int: Value representing the mean line length. 170 """ 171 mean_line_lengths = grouped_line_df.agg(avg(line_len_col_).alias("mean_line_length")) 172 return mean_line_lengths
get_mean_line_length Method that returns the mean line length of all the lines present in the dataframe.
Arguments:
- grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
- line_len_col_ (str): Column that represents the line length in a document.
Returns:
int: Value representing the mean line length.
174 def get_min_line_length(self, grouped_line_df:DataFrame, line_len_col_:str)->int: 175 """get_min_line_length Method that returns the min line length of all the lines present in the dataframe. 176 177 Args: 178 grouped_line_df (DataFrame): Dataframe object containing the grouped lines. 179 line_len_col_ (str): Column that represents the line length in a document. 180 181 Returns: 182 int: Value representing the min line length. 183 """ 184 min_line_lengths_col = grouped_line_df.agg(min(line_len_col_).alias("min_line_length")) 185 return min_line_lengths_col
get_min_line_length Method that returns the min line length of all the lines present in the dataframe.
Arguments:
- grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
- line_len_col_ (str): Column that represents the line length in a document.
Returns:
int: Value representing the min line length.
187 def get_max_line_length(self, grouped_line_df:DataFrame, line_len_col_:str)->int: 188 """get_max_line_length Method that returns the max line length of all the lines present in the dataframe. 189 190 Args: 191 grouped_line_df (DataFrame): Dataframe object containing the grouped lines. 192 line_len_col_ (str): Column name that contains the line length for the various document lines. 193 194 Returns: 195 int: Value representing the max line length. 196 """ 197 max_line_lengths = grouped_line_df.agg(max(line_len_col_).alias("max_line_length")) 198 return max_line_lengths
get_max_line_length Method that returns the max line length of all the lines present in the dataframe.
Arguments:
- grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
- line_len_col_ (str): Column name that contains the line length for the various document lines.
Returns:
int: Value representing the max line length.
200 def get_nsfw_words_count(self, grouped_line_df:DataFrame, line_nsfw_count_col_:str)->int: 201 """get_nsfw_words_count Method that returns the number of NSFW words present in the dataframe. 202 203 Args: 204 grouped_line_df (DataFrame): Dataframe object containing the grouped lines. 205 line_nsfw_count_col_ (str): Column name that contains the nsfw word count for the various document lines. 206 207 Returns: 208 int: Value representing the total nsfw word count. 209 """ 210 nsfw_count = grouped_line_df.agg(sum(line_nsfw_count_col_).alias("nsfw_words_count")) 211 return nsfw_count
get_nsfw_words_count Method that returns the number of NSFW words present in the dataframe.
Arguments:
- grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
- line_nsfw_count_col_ (str): Column name that contains the nsfw word count for the various document lines.
Returns:
int: Value representing the total nsfw word count.
213 def get_non_li_words_count(self, grouped_line_df:DataFrame, line_non_li_count_col_:str)->int: 214 """get_non_li_words_count Method that returns the number of non latin-indic words in the dataframe. 215 216 Args: 217 grouped_line_df (DataFrame): Dataframe object containing the grouped lines. 218 line_non_li_count_col_ (str): Column name that contains the non-li word count for the various document lines. 219 220 Returns: 221 int: Value representing the total non-latin indic word count. 222 """ 223 non_li_count = grouped_line_df.agg(sum(line_non_li_count_col_).alias("non_li_char_count")) 224 return non_li_count
get_non_li_words_count Method that returns the number of non latin-indic words in the dataframe.
Arguments:
- grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
- line_non_li_count_col_ (str): Column name that contains the non-li word count for the various document lines.
Returns:
int: Value representing the total non-latin indic word count.
226 def get_bytes(self, grouped_line_df:DataFrame, line_bytes_col_:str)->int: 227 """get_bytes Method that returns the total bytes that represent the data present in the dataframe. 228 229 Args: 230 grouped_line_df (DataFrame): Dataframe object containing the grouped lines. 231 line_bytes_col_ (str): Column name that contains the total bytes for the various document lines. 232 233 Returns: 234 int: Value representing the total bytes of data present in the dataframe. 235 """ 236 bytes_ = grouped_line_df.agg(sum(line_bytes_col_).alias("bytes")) 237 return bytes_
get_bytes Method that returns the total bytes that represent the data present in the dataframe.
Arguments:
- grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
- line_bytes_col_ (str): Column name that contains the total bytes for the various document lines.
Returns:
int: Value representing the total bytes of data present in the dataframe.
239 def get_words_count(self, grouped_line_df:DataFrame, line_words_count_col_:str)->int: 240 """get_words_count Method that returns the total word count present in the dataframe. 241 242 Args: 243 grouped_line_df (DataFrame): Dataframe object containing the grouped lines. 244 line_words_count_col_ (str): Column name that contains the word count of the various document lines. 245 246 Returns: 247 int: Value representing the total word count in the dataframe. 248 """ 249 word_count = grouped_line_df.agg(sum(line_words_count_col_).alias("words_count")) 250 return word_count
get_words_count Method that returns the total word count present in the dataframe.
Arguments:
- grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
- line_words_count_col_ (str): Column name that contains the word count of the various document lines.
Returns:
int: Value representing the total word count in the dataframe.
252 def get_char_count(self, grouped_line_df:DataFrame, line_char_count_col_:str)->int: 253 """get_char_count Method that returns the total char count present in the dataframe. 254 255 Args: 256 grouped_line_df (DataFrame): Dataframe object containing the grouped lines. 257 line_char_count_col_ (str): Column name that contains the char count of the various document lines. 258 259 Returns: 260 int: Value representing the total character count in the dataframe. 261 """ 262 char_count = grouped_line_df.agg(sum(line_char_count_col_).alias("char_count")) 263 return char_count
get_char_count Method that returns the total char count present in the dataframe.
Arguments:
- grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
- line_char_count_col_ (str): Column name that contains the char count of the various document lines.
Returns:
int: Value representing the total character count in the dataframe.
265 def get_repeated_line_dist(self, line_df:DataFrame, id_col:str, text_col:str)->int: 266 """get_repeated_line_dist Method that returns the distance between the closest repeated lines. 267 268 Args: 269 line_df (DataFrame): Dataframe object containing the lines. 270 id_col (str): The column based on which the dataframe needs to be grouped by. 271 text_col (str): The column name for the text in the dataframe. 272 273 Returns: 274 int: Returns the distance between repeated lines 275 """ 276 col_name = "repeated_line_dist" 277 278 repeated_line_dist = line_df.groupBy(id_col, text_col) \ 279 .agg(count("*").alias(col_name)) \ 280 .groupBy(id_col) \ 281 .agg(collect_list(create_map([text_col, col_name])).alias(col_name)) \ 282 .withColumn("keys", expr(f"transform({col_name}, x -> map_keys(x)[0])")) \ 283 .withColumn("values", expr(f"transform({col_name}, x -> map_values(x)[0])")) \ 284 .withColumn(col_name, map_from_arrays(col("keys"), col("values"))) \ 285 .drop("keys", "values") 286 287 return repeated_line_dist
get_repeated_line_dist Method that returns the distance between the closest repeated lines.
Arguments:
- line_df (DataFrame): Dataframe object containing the lines.
- id_col (str): The column based on which the dataframe needs to be grouped by.
- text_col (str): The column name for the text in the dataframe.
Returns:
int: Returns the distance between repeated lines
289 def run_analysis( 290 self, 291 line_df:DataFrame, 292 doc_id_col:str, 293 line_nsfw_count_col_:str, 294 line_non_li_count_col_:str, 295 line_bytes_col_:str, 296 line_words_count_col_:str, 297 line_char_count_col_:str, 298 only_base_stats:bool=False, 299 ) -> DataFrame: 300 """run_analysis Method that runs the analysis and aggregates the various stats for the dataframe. 301 302 Args: 303 line_df (DataFrame): Dataframe object containing the lines. 304 doc_id_col (str): The column based on which the dataframe needs to be grouped by. 305 line_nsfw_count_col_ (str): Column name that contains the nsfw word count of the various document lines. 306 line_non_li_count_col_ (str): Column name that contains the non latin-indic word count of the various document lines. 307 line_bytes_col_ (str): Column name that contains the byte count of the various document lines. 308 line_words_count_col_ (str): Column name that contains the word count of the various document lines. 309 line_char_count_col_ (str): Column name that contains the character count of the various document lines. 310 only_base_stats (bool, optional): If only return the basic statistic values. Defaults to False. 311 312 Returns: 313 DataFrame: Returns the dataframe with computed statistic values. 314 """ 315 grouped_line_df = line_df.groupBy(doc_id_col) 316 bytes_df = self.get_bytes(grouped_line_df, line_bytes_col_) 317 words_count_df = self.get_words_count(grouped_line_df, line_words_count_col_) 318 char_count_df = self.get_char_count(grouped_line_df, line_char_count_col_) 319 320 doc_df = bytes_df \ 321 .join(words_count_df, [doc_id_col]) \ 322 .join(char_count_df, [doc_id_col]) 323 324 if not only_base_stats: 325 num_lines_df = self.get_num_lines(grouped_line_df) 326 nsfw_words_count_df = self.get_nsfw_words_count(grouped_line_df, line_nsfw_count_col_) 327 non_li_words_count_df = self.get_non_li_words_count(grouped_line_df, line_non_li_count_col_) 328 mean_line_len_df = self.get_mean_line_length(grouped_line_df, "words_count") 329 min_line_len_df = self.get_min_line_length(grouped_line_df, "words_count") 330 max_line_len_df = self.get_max_line_length(grouped_line_df, "words_count") 331 332 doc_df = doc_df \ 333 .join(num_lines_df, [doc_id_col]) \ 334 .join(mean_line_len_df, [doc_id_col]) \ 335 .join(min_line_len_df, [doc_id_col]) \ 336 .join(max_line_len_df, [doc_id_col]) \ 337 .join(nsfw_words_count_df, [doc_id_col]) \ 338 .join(non_li_words_count_df, [doc_id_col]) 339 340 return doc_df
run_analysis Method that runs the analysis and aggregates the various stats for the dataframe.
Arguments:
- line_df (DataFrame): Dataframe object containing the lines.
- doc_id_col (str): The column based on which the dataframe needs to be grouped by.
- line_nsfw_count_col_ (str): Column name that contains the nsfw word count of the various document lines.
- line_non_li_count_col_ (str): Column name that contains the non latin-indic word count of the various document lines.
- line_bytes_col_ (str): Column name that contains the byte count of the various document lines.
- line_words_count_col_ (str): Column name that contains the word count of the various document lines.
- line_char_count_col_ (str): Column name that contains the character count of the various document lines.
- only_base_stats (bool, optional): If only return the basic statistic values. Defaults to False.
Returns:
DataFrame: Returns the dataframe with computed statistic values.
342 def run_flagging( 343 self, 344 doc_df:DataFrame, 345 word_count_col:str, 346 char_count_col:str, 347 nsfw_count_col:str, 348 nsfw_threshold:float, 349 non_li_count_col:str, 350 non_li_threshold:float, 351 min_line_count:int, 352 line_count_col:str, 353 min_mean_line_len:int, 354 mean_line_len_col:str, 355 )->DataFrame: 356 """run_flagging Method that executes the flagging stage based on computed document statistics. 357 358 Args: 359 doc_df (DataFrame): The dataframe object containing the various documents. 360 word_count_col (str): Column name that contains the word count of the various document lines. 361 char_count_col (str): Column name that contains the character word count of the various document lines. 362 nsfw_count_col (str): Column name that contains the nsfw word count of the various document lines. 363 nsfw_threshold (float): Threshold value for number of NSFW words acceptable. 364 non_li_count_col (str): Column name that contains the non latin-indic word count of the various document lines. 365 non_li_threshold (float): Threshold value for number of non latin-indic words. 366 min_line_count (int): Threshold value for minimum number of lines to constitute a document. 367 line_count_col (str): Column name that contains the line count of the various documents. 368 min_mean_line_len (int): Threshold value for the mean line length. 369 mean_line_len_col (str): Column name that contains the mean line length of the various document lines. 370 371 Returns: 372 DataFrame: _description_ 373 """ 374 doc_df = doc_df \ 375 .select("*", when(doc_df[line_count_col] <= min_line_count, True).otherwise(False).alias("has_less_lines")) \ 376 .select("*", when(doc_df[mean_line_len_col] <= min_mean_line_len, True).otherwise(False).alias("is_short_lines_heavy")) \ 377 .select("*", when(doc_df[nsfw_count_col]/doc_df[word_count_col] >= nsfw_threshold, True).otherwise(False).alias("is_nsfw_heavy")) \ 378 .select("*", when(doc_df[non_li_count_col]/doc_df[char_count_col] >= non_li_threshold, True).otherwise(False).alias("is_non_li_heavy")) 379 380 return doc_df
run_flagging Method that executes the flagging stage based on computed document statistics.
Arguments:
- doc_df (DataFrame): The dataframe object containing the various documents.
- word_count_col (str): Column name that contains the word count of the various document lines.
- char_count_col (str): Column name that contains the character word count of the various document lines.
- nsfw_count_col (str): Column name that contains the nsfw word count of the various document lines.
- nsfw_threshold (float): Threshold value for number of NSFW words acceptable.
- non_li_count_col (str): Column name that contains the non latin-indic word count of the various document lines.
- non_li_threshold (float): Threshold value for number of non latin-indic words.
- min_line_count (int): Threshold value for minimum number of lines to constitute a document.
- line_count_col (str): Column name that contains the line count of the various documents.
- min_mean_line_len (int): Threshold value for the mean line length.
- mean_line_len_col (str): Column name that contains the mean line length of the various document lines.
Returns:
DataFrame: _description_