setu.utilities

  1import argparse
  2import os 
  3import shutil
  4from pyspark.sql import DataFrame
  5from pyspark.sql.functions import (
  6    split,   
  7    count,
  8    sum,
  9    avg,
 10    udf,
 11    min,
 12    max,
 13    when,
 14    col,
 15    create_map,
 16    collect_list,
 17    expr,
 18    map_from_arrays,
 19    posexplode,
 20    struct,
 21)
 22
 23from pyspark.sql.types import (
 24    StringType, 
 25)
 26
 27
 28
 29def str2bool(v:str) -> bool:
 30    """str2bool Returns the boolean equivalent given various string representations of the True/False values.
 31
 32    Args:
 33        v (str): A string that might represent a boolean value.
 34
 35    Raises:
 36        argparse.ArgumentTypeError: Error that mentions the provided value does not represent a boolean value.
 37
 38    Returns:
 39        bool : Returns the bool equivalent of the provided value.
 40    """
 41    if v.lower() in ('yes', 'true', 't', 'y', '1'):
 42        return True
 43    elif v.lower() in ('no', 'false', 'f', 'n', '0'):
 44        return False
 45    else:
 46        raise argparse.ArgumentTypeError('Boolean value expected.')
 47    
 48def list_of_strings(arg:str) -> list:
 49    """list_of_strings Generate the list of strings provided a single string.
 50
 51    Args:
 52        arg (str): The string argument which needs to be split.
 53
 54    Returns:
 55        list: List of strings of the string split using ',' delimitter.
 56    """
 57    return arg.split(',')
 58
 59def rename_partitioned_directories(base_dir:str, partition_column_name:str): 
 60    """rename_partitioned_directories Function that renames the partitioned directiories.
 61
 62    Args:
 63        base_dir (str): Base directory path
 64        partition_column_name (str): Column name based on which the partitions were produced.
 65    """
 66    for dir_name in os.listdir(base_dir):
 67        if dir_name.startswith(partition_column_name + "="):
 68            new_name = dir_name.split("=")[1]
 69            old_path = os.path.join(base_dir, dir_name)
 70            new_path = os.path.join(base_dir, new_name)
 71            shutil.move(old_path, new_path)
 72    
 73class ChunkHandler():
 74    """ChunkHandler The Class representation for the a handler object that provides utilities that manipulates various chunks of text data.
 75    """
 76    def doc2lines(
 77            self,
 78            df:DataFrame,
 79            text_column:str,
 80            split_symbol:str
 81    ) -> DataFrame:
 82        """doc2lines Given a dataframe, Splits the various documents into multiple lines.
 83
 84        Args:
 85            df (DataFrame): The dataframe object input.
 86            text_column (str): The column name for the text in the dataframe.
 87            split_symbol (str): The symbol on which splits need to be done.
 88
 89        Returns:
 90            DataFrame: _description_
 91        """
 92        df = df \
 93            .withColumn(text_column, split(text_column, split_symbol, -1)) \
 94            .select("*", posexplode(text_column)).drop(text_column).withColumnRenamed("col", text_column) \
 95        
 96        return df
 97    
 98    def lines2doc(
 99            self,
100            df:DataFrame,
101            text_column:str,
102            identifier_column:str,
103            sort_column:str
104    )->DataFrame:
105        """lines2doc Given a dataframe, Merges the various lines into documents.
106
107        Args:
108            df (DataFrame): The dataframe object input.
109            text_column (str): The column name for the text in the dataframe.
110            identifier_column (str): The column based on which the lines need to be grouped into documents.
111            sort_column (str): The column based on which the final dataframe needs to be sorted.
112
113        Returns:
114            DataFrame: _description_
115        """
116        def join_using_symbol(x):
117            lines = []
118            for line in x:
119                if line:
120                    lines += [line[text_column]]
121            
122            text = ""
123            for line in lines:
124                if len(line) >= 2 and line[0] == " " and line[1] == " ":
125                    text += line[1:]
126                else:
127                    text += line
128            return text
129        
130        join_lines = udf(join_using_symbol, StringType())
131
132        df = df.withColumn(text_column, struct([sort_column, text_column])).select(identifier_column, text_column) \
133                .groupBy(identifier_column) \
134                .agg(collect_list(text_column).alias(text_column)) \
135                .withColumn(
136                    text_column,
137                    expr(
138                        f"array_sort(transform({text_column},x->struct(x['{sort_column}'] as {sort_column},x['{text_column}'] as {text_column})))"
139                    )   
140                ) \
141                .withColumn(text_column, join_lines(text_column))
142
143        return df
144
145class SparkOptimizedHandlers():
146    """SparkOptimizedHandlers The Class representation for the a handler object that provides utilities that manipulates the dataframe and provides with various statistics.
147    """
148    def get_num_lines(self, grouped_line_df:DataFrame)->int:
149        """get_num_lines Method that returns the number of lines present in the dataframe.
150
151        Args:
152            grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
153
154        Returns:
155            int: Value representing the number of lines.
156        """
157        lines_count = grouped_line_df.agg(count("*").alias("lines_count"))
158        return lines_count
159
160    def get_mean_line_length(self, grouped_line_df:DataFrame, line_len_col_:str)->int:
161        """get_mean_line_length Method that returns the mean line length of all the lines present in the dataframe.
162
163        Args:
164            grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
165            line_len_col_ (str): Column that represents the line length in a document.
166
167        Returns:
168            int: Value representing the mean line length.
169        """
170        mean_line_lengths = grouped_line_df.agg(avg(line_len_col_).alias("mean_line_length"))
171        return mean_line_lengths
172
173    def get_min_line_length(self, grouped_line_df:DataFrame, line_len_col_:str)->int:
174        """get_min_line_length Method that returns the min line length of all the lines present in the dataframe.
175
176        Args:
177            grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
178            line_len_col_ (str): Column that represents the line length in a document.
179
180        Returns:
181            int: Value representing the min line length.
182        """
183        min_line_lengths_col = grouped_line_df.agg(min(line_len_col_).alias("min_line_length"))
184        return min_line_lengths_col
185
186    def get_max_line_length(self, grouped_line_df:DataFrame, line_len_col_:str)->int:
187        """get_max_line_length Method that returns the max line length of all the lines present in the dataframe.
188
189        Args:
190            grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
191            line_len_col_ (str): Column name that contains the line length for the various document lines.
192
193        Returns:
194            int: Value representing the max line length.
195        """
196        max_line_lengths = grouped_line_df.agg(max(line_len_col_).alias("max_line_length"))
197        return max_line_lengths
198
199    def get_nsfw_words_count(self, grouped_line_df:DataFrame, line_nsfw_count_col_:str)->int:
200        """get_nsfw_words_count Method that returns the number of NSFW words present in the dataframe.
201
202        Args:
203            grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
204            line_nsfw_count_col_ (str): Column name that contains the nsfw word count for the various document lines.
205
206        Returns:
207            int: Value representing the total nsfw word count.
208        """
209        nsfw_count = grouped_line_df.agg(sum(line_nsfw_count_col_).alias("nsfw_words_count"))
210        return nsfw_count
211
212    def get_non_li_words_count(self, grouped_line_df:DataFrame, line_non_li_count_col_:str)->int:
213        """get_non_li_words_count Method that returns the number of non latin-indic words in the dataframe.
214
215        Args:
216            grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
217            line_non_li_count_col_ (str): Column name that contains the non-li word count for the various document lines.
218
219        Returns:
220            int: Value representing the total non-latin indic word count.
221        """
222        non_li_count = grouped_line_df.agg(sum(line_non_li_count_col_).alias("non_li_char_count"))
223        return non_li_count
224
225    def get_bytes(self, grouped_line_df:DataFrame, line_bytes_col_:str)->int:
226        """get_bytes Method that returns the total bytes that represent the data present in the dataframe.
227
228        Args:
229            grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
230            line_bytes_col_ (str): Column name that contains the total bytes for the various document lines.
231
232        Returns:
233            int: Value representing the total bytes of data present in the dataframe.
234        """
235        bytes_ = grouped_line_df.agg(sum(line_bytes_col_).alias("bytes"))
236        return bytes_
237
238    def get_words_count(self, grouped_line_df:DataFrame, line_words_count_col_:str)->int:
239        """get_words_count Method that returns the total word count present in the dataframe.
240
241        Args:
242            grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
243            line_words_count_col_ (str): Column name that contains the word count of the various document lines.
244
245        Returns:
246            int: Value representing the total word count in the dataframe.
247        """
248        word_count = grouped_line_df.agg(sum(line_words_count_col_).alias("words_count"))
249        return word_count
250
251    def get_char_count(self, grouped_line_df:DataFrame, line_char_count_col_:str)->int:
252        """get_char_count Method that returns the total char count present in the dataframe.
253
254        Args:
255            grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
256            line_char_count_col_ (str): Column name that contains the char count of the various document lines.
257
258        Returns:
259            int: Value representing the total character count in the dataframe.
260        """
261        char_count = grouped_line_df.agg(sum(line_char_count_col_).alias("char_count"))
262        return char_count
263
264    def get_repeated_line_dist(self, line_df:DataFrame, id_col:str, text_col:str)->int:
265        """get_repeated_line_dist Method that returns the distance between the closest repeated lines.
266
267        Args:
268            line_df (DataFrame): Dataframe object containing the lines.
269            id_col (str): The column based on which the dataframe needs to be grouped by.
270            text_col (str): The column name for the text in the dataframe.
271
272        Returns:
273            int: Returns the distance between repeated lines
274        """
275        col_name = "repeated_line_dist"
276
277        repeated_line_dist = line_df.groupBy(id_col, text_col) \
278                                    .agg(count("*").alias(col_name)) \
279                                    .groupBy(id_col) \
280                                    .agg(collect_list(create_map([text_col, col_name])).alias(col_name)) \
281                                    .withColumn("keys", expr(f"transform({col_name}, x -> map_keys(x)[0])")) \
282                                    .withColumn("values", expr(f"transform({col_name}, x -> map_values(x)[0])")) \
283                                    .withColumn(col_name, map_from_arrays(col("keys"), col("values"))) \
284                                    .drop("keys", "values")
285
286        return repeated_line_dist
287
288    def run_analysis(
289        self,
290        line_df:DataFrame,
291        doc_id_col:str,
292        line_nsfw_count_col_:str,
293        line_non_li_count_col_:str,
294        line_bytes_col_:str,
295        line_words_count_col_:str,
296        line_char_count_col_:str,
297        only_base_stats:bool=False,
298    ) -> DataFrame:
299        """run_analysis Method that runs the analysis and aggregates the various stats for the dataframe.
300
301        Args:
302            line_df (DataFrame): Dataframe object containing the lines.
303            doc_id_col (str): The column based on which the dataframe needs to be grouped by.
304            line_nsfw_count_col_ (str): Column name that contains the nsfw word count of the various document lines.
305            line_non_li_count_col_ (str): Column name that contains the non latin-indic word count of the various document lines.
306            line_bytes_col_ (str): Column name that contains the byte count of the various document lines.
307            line_words_count_col_ (str): Column name that contains the word count of the various document lines.
308            line_char_count_col_ (str): Column name that contains the character count of the various document lines.
309            only_base_stats (bool, optional): If only return the basic statistic values. Defaults to False.
310
311        Returns:
312            DataFrame: Returns the dataframe with computed statistic values.
313        """
314        grouped_line_df = line_df.groupBy(doc_id_col)
315        bytes_df = self.get_bytes(grouped_line_df, line_bytes_col_)
316        words_count_df = self.get_words_count(grouped_line_df, line_words_count_col_)
317        char_count_df = self.get_char_count(grouped_line_df, line_char_count_col_)
318
319        doc_df = bytes_df \
320                    .join(words_count_df, [doc_id_col]) \
321                    .join(char_count_df, [doc_id_col])
322
323        if not only_base_stats:
324            num_lines_df = self.get_num_lines(grouped_line_df)
325            nsfw_words_count_df = self.get_nsfw_words_count(grouped_line_df, line_nsfw_count_col_)
326            non_li_words_count_df = self.get_non_li_words_count(grouped_line_df, line_non_li_count_col_)
327            mean_line_len_df = self.get_mean_line_length(grouped_line_df, "words_count")
328            min_line_len_df = self.get_min_line_length(grouped_line_df, "words_count")
329            max_line_len_df = self.get_max_line_length(grouped_line_df, "words_count")
330
331            doc_df = doc_df \
332                        .join(num_lines_df, [doc_id_col]) \
333                        .join(mean_line_len_df, [doc_id_col]) \
334                        .join(min_line_len_df, [doc_id_col]) \
335                        .join(max_line_len_df, [doc_id_col]) \
336                        .join(nsfw_words_count_df, [doc_id_col]) \
337                        .join(non_li_words_count_df, [doc_id_col])
338        
339        return doc_df
340
341    def run_flagging(
342        self, 
343        doc_df:DataFrame,
344        word_count_col:str,
345        char_count_col:str,
346        nsfw_count_col:str,
347        nsfw_threshold:float,
348        non_li_count_col:str, 
349        non_li_threshold:float,
350        min_line_count:int,
351        line_count_col:str,
352        min_mean_line_len:int,
353        mean_line_len_col:str,
354    )->DataFrame:
355        """run_flagging Method that executes the flagging stage based on computed document statistics.
356
357        Args:
358            doc_df (DataFrame): The dataframe object containing the various documents.
359            word_count_col (str): Column name that contains the word count of the various document lines.
360            char_count_col (str): Column name that contains the character word count of the various document lines.
361            nsfw_count_col (str): Column name that contains the nsfw word count of the various document lines.
362            nsfw_threshold (float): Threshold value for number of NSFW words acceptable.
363            non_li_count_col (str): Column name that contains the non latin-indic word count of the various document lines.
364            non_li_threshold (float): Threshold value for number of non latin-indic words.
365            min_line_count (int): Threshold value for minimum number of lines to constitute a document.
366            line_count_col (str): Column name that contains the line count of the various documents.
367            min_mean_line_len (int): Threshold value for the mean line length.
368            mean_line_len_col (str): Column name that contains the mean line length of the various document lines.
369
370        Returns:
371            DataFrame: _description_
372        """
373        doc_df = doc_df \
374                .select("*", when(doc_df[line_count_col] <= min_line_count, True).otherwise(False).alias("has_less_lines")) \
375                .select("*", when(doc_df[mean_line_len_col] <= min_mean_line_len, True).otherwise(False).alias("is_short_lines_heavy")) \
376                .select("*", when(doc_df[nsfw_count_col]/doc_df[word_count_col] >= nsfw_threshold, True).otherwise(False).alias("is_nsfw_heavy")) \
377                .select("*", when(doc_df[non_li_count_col]/doc_df[char_count_col] >= non_li_threshold, True).otherwise(False).alias("is_non_li_heavy"))
378        
379        return doc_df
def str2bool(v: str) -> bool:
30def str2bool(v:str) -> bool:
31    """str2bool Returns the boolean equivalent given various string representations of the True/False values.
32
33    Args:
34        v (str): A string that might represent a boolean value.
35
36    Raises:
37        argparse.ArgumentTypeError: Error that mentions the provided value does not represent a boolean value.
38
39    Returns:
40        bool : Returns the bool equivalent of the provided value.
41    """
42    if v.lower() in ('yes', 'true', 't', 'y', '1'):
43        return True
44    elif v.lower() in ('no', 'false', 'f', 'n', '0'):
45        return False
46    else:
47        raise argparse.ArgumentTypeError('Boolean value expected.')

str2bool Returns the boolean equivalent given various string representations of the True/False values.

Arguments:
  • v (str): A string that might represent a boolean value.
Raises:
  • argparse.ArgumentTypeError: Error that mentions the provided value does not represent a boolean value.
Returns:

bool : Returns the bool equivalent of the provided value.

def list_of_strings(arg: str) -> list:
49def list_of_strings(arg:str) -> list:
50    """list_of_strings Generate the list of strings provided a single string.
51
52    Args:
53        arg (str): The string argument which needs to be split.
54
55    Returns:
56        list: List of strings of the string split using ',' delimitter.
57    """
58    return arg.split(',')

list_of_strings Generate the list of strings provided a single string.

Arguments:
  • arg (str): The string argument which needs to be split.
Returns:

list: List of strings of the string split using ',' delimitter.

def rename_partitioned_directories(base_dir: str, partition_column_name: str):
60def rename_partitioned_directories(base_dir:str, partition_column_name:str): 
61    """rename_partitioned_directories Function that renames the partitioned directiories.
62
63    Args:
64        base_dir (str): Base directory path
65        partition_column_name (str): Column name based on which the partitions were produced.
66    """
67    for dir_name in os.listdir(base_dir):
68        if dir_name.startswith(partition_column_name + "="):
69            new_name = dir_name.split("=")[1]
70            old_path = os.path.join(base_dir, dir_name)
71            new_path = os.path.join(base_dir, new_name)
72            shutil.move(old_path, new_path)

rename_partitioned_directories Function that renames the partitioned directiories.

Arguments:
  • base_dir (str): Base directory path
  • partition_column_name (str): Column name based on which the partitions were produced.
class ChunkHandler:
 74class ChunkHandler():
 75    """ChunkHandler The Class representation for the a handler object that provides utilities that manipulates various chunks of text data.
 76    """
 77    def doc2lines(
 78            self,
 79            df:DataFrame,
 80            text_column:str,
 81            split_symbol:str
 82    ) -> DataFrame:
 83        """doc2lines Given a dataframe, Splits the various documents into multiple lines.
 84
 85        Args:
 86            df (DataFrame): The dataframe object input.
 87            text_column (str): The column name for the text in the dataframe.
 88            split_symbol (str): The symbol on which splits need to be done.
 89
 90        Returns:
 91            DataFrame: _description_
 92        """
 93        df = df \
 94            .withColumn(text_column, split(text_column, split_symbol, -1)) \
 95            .select("*", posexplode(text_column)).drop(text_column).withColumnRenamed("col", text_column) \
 96        
 97        return df
 98    
 99    def lines2doc(
100            self,
101            df:DataFrame,
102            text_column:str,
103            identifier_column:str,
104            sort_column:str
105    )->DataFrame:
106        """lines2doc Given a dataframe, Merges the various lines into documents.
107
108        Args:
109            df (DataFrame): The dataframe object input.
110            text_column (str): The column name for the text in the dataframe.
111            identifier_column (str): The column based on which the lines need to be grouped into documents.
112            sort_column (str): The column based on which the final dataframe needs to be sorted.
113
114        Returns:
115            DataFrame: _description_
116        """
117        def join_using_symbol(x):
118            lines = []
119            for line in x:
120                if line:
121                    lines += [line[text_column]]
122            
123            text = ""
124            for line in lines:
125                if len(line) >= 2 and line[0] == " " and line[1] == " ":
126                    text += line[1:]
127                else:
128                    text += line
129            return text
130        
131        join_lines = udf(join_using_symbol, StringType())
132
133        df = df.withColumn(text_column, struct([sort_column, text_column])).select(identifier_column, text_column) \
134                .groupBy(identifier_column) \
135                .agg(collect_list(text_column).alias(text_column)) \
136                .withColumn(
137                    text_column,
138                    expr(
139                        f"array_sort(transform({text_column},x->struct(x['{sort_column}'] as {sort_column},x['{text_column}'] as {text_column})))"
140                    )   
141                ) \
142                .withColumn(text_column, join_lines(text_column))
143
144        return df

ChunkHandler The Class representation for the a handler object that provides utilities that manipulates various chunks of text data.

def doc2lines( self, df: pyspark.sql.dataframe.DataFrame, text_column: str, split_symbol: str) -> pyspark.sql.dataframe.DataFrame:
77    def doc2lines(
78            self,
79            df:DataFrame,
80            text_column:str,
81            split_symbol:str
82    ) -> DataFrame:
83        """doc2lines Given a dataframe, Splits the various documents into multiple lines.
84
85        Args:
86            df (DataFrame): The dataframe object input.
87            text_column (str): The column name for the text in the dataframe.
88            split_symbol (str): The symbol on which splits need to be done.
89
90        Returns:
91            DataFrame: _description_
92        """
93        df = df \
94            .withColumn(text_column, split(text_column, split_symbol, -1)) \
95            .select("*", posexplode(text_column)).drop(text_column).withColumnRenamed("col", text_column) \
96        
97        return df

doc2lines Given a dataframe, Splits the various documents into multiple lines.

Arguments:
  • df (DataFrame): The dataframe object input.
  • text_column (str): The column name for the text in the dataframe.
  • split_symbol (str): The symbol on which splits need to be done.
Returns:

DataFrame: _description_

def lines2doc( self, df: pyspark.sql.dataframe.DataFrame, text_column: str, identifier_column: str, sort_column: str) -> pyspark.sql.dataframe.DataFrame:
 99    def lines2doc(
100            self,
101            df:DataFrame,
102            text_column:str,
103            identifier_column:str,
104            sort_column:str
105    )->DataFrame:
106        """lines2doc Given a dataframe, Merges the various lines into documents.
107
108        Args:
109            df (DataFrame): The dataframe object input.
110            text_column (str): The column name for the text in the dataframe.
111            identifier_column (str): The column based on which the lines need to be grouped into documents.
112            sort_column (str): The column based on which the final dataframe needs to be sorted.
113
114        Returns:
115            DataFrame: _description_
116        """
117        def join_using_symbol(x):
118            lines = []
119            for line in x:
120                if line:
121                    lines += [line[text_column]]
122            
123            text = ""
124            for line in lines:
125                if len(line) >= 2 and line[0] == " " and line[1] == " ":
126                    text += line[1:]
127                else:
128                    text += line
129            return text
130        
131        join_lines = udf(join_using_symbol, StringType())
132
133        df = df.withColumn(text_column, struct([sort_column, text_column])).select(identifier_column, text_column) \
134                .groupBy(identifier_column) \
135                .agg(collect_list(text_column).alias(text_column)) \
136                .withColumn(
137                    text_column,
138                    expr(
139                        f"array_sort(transform({text_column},x->struct(x['{sort_column}'] as {sort_column},x['{text_column}'] as {text_column})))"
140                    )   
141                ) \
142                .withColumn(text_column, join_lines(text_column))
143
144        return df

lines2doc Given a dataframe, Merges the various lines into documents.

Arguments:
  • df (DataFrame): The dataframe object input.
  • text_column (str): The column name for the text in the dataframe.
  • identifier_column (str): The column based on which the lines need to be grouped into documents.
  • sort_column (str): The column based on which the final dataframe needs to be sorted.
Returns:

DataFrame: _description_

class SparkOptimizedHandlers:
146class SparkOptimizedHandlers():
147    """SparkOptimizedHandlers The Class representation for the a handler object that provides utilities that manipulates the dataframe and provides with various statistics.
148    """
149    def get_num_lines(self, grouped_line_df:DataFrame)->int:
150        """get_num_lines Method that returns the number of lines present in the dataframe.
151
152        Args:
153            grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
154
155        Returns:
156            int: Value representing the number of lines.
157        """
158        lines_count = grouped_line_df.agg(count("*").alias("lines_count"))
159        return lines_count
160
161    def get_mean_line_length(self, grouped_line_df:DataFrame, line_len_col_:str)->int:
162        """get_mean_line_length Method that returns the mean line length of all the lines present in the dataframe.
163
164        Args:
165            grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
166            line_len_col_ (str): Column that represents the line length in a document.
167
168        Returns:
169            int: Value representing the mean line length.
170        """
171        mean_line_lengths = grouped_line_df.agg(avg(line_len_col_).alias("mean_line_length"))
172        return mean_line_lengths
173
174    def get_min_line_length(self, grouped_line_df:DataFrame, line_len_col_:str)->int:
175        """get_min_line_length Method that returns the min line length of all the lines present in the dataframe.
176
177        Args:
178            grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
179            line_len_col_ (str): Column that represents the line length in a document.
180
181        Returns:
182            int: Value representing the min line length.
183        """
184        min_line_lengths_col = grouped_line_df.agg(min(line_len_col_).alias("min_line_length"))
185        return min_line_lengths_col
186
187    def get_max_line_length(self, grouped_line_df:DataFrame, line_len_col_:str)->int:
188        """get_max_line_length Method that returns the max line length of all the lines present in the dataframe.
189
190        Args:
191            grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
192            line_len_col_ (str): Column name that contains the line length for the various document lines.
193
194        Returns:
195            int: Value representing the max line length.
196        """
197        max_line_lengths = grouped_line_df.agg(max(line_len_col_).alias("max_line_length"))
198        return max_line_lengths
199
200    def get_nsfw_words_count(self, grouped_line_df:DataFrame, line_nsfw_count_col_:str)->int:
201        """get_nsfw_words_count Method that returns the number of NSFW words present in the dataframe.
202
203        Args:
204            grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
205            line_nsfw_count_col_ (str): Column name that contains the nsfw word count for the various document lines.
206
207        Returns:
208            int: Value representing the total nsfw word count.
209        """
210        nsfw_count = grouped_line_df.agg(sum(line_nsfw_count_col_).alias("nsfw_words_count"))
211        return nsfw_count
212
213    def get_non_li_words_count(self, grouped_line_df:DataFrame, line_non_li_count_col_:str)->int:
214        """get_non_li_words_count Method that returns the number of non latin-indic words in the dataframe.
215
216        Args:
217            grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
218            line_non_li_count_col_ (str): Column name that contains the non-li word count for the various document lines.
219
220        Returns:
221            int: Value representing the total non-latin indic word count.
222        """
223        non_li_count = grouped_line_df.agg(sum(line_non_li_count_col_).alias("non_li_char_count"))
224        return non_li_count
225
226    def get_bytes(self, grouped_line_df:DataFrame, line_bytes_col_:str)->int:
227        """get_bytes Method that returns the total bytes that represent the data present in the dataframe.
228
229        Args:
230            grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
231            line_bytes_col_ (str): Column name that contains the total bytes for the various document lines.
232
233        Returns:
234            int: Value representing the total bytes of data present in the dataframe.
235        """
236        bytes_ = grouped_line_df.agg(sum(line_bytes_col_).alias("bytes"))
237        return bytes_
238
239    def get_words_count(self, grouped_line_df:DataFrame, line_words_count_col_:str)->int:
240        """get_words_count Method that returns the total word count present in the dataframe.
241
242        Args:
243            grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
244            line_words_count_col_ (str): Column name that contains the word count of the various document lines.
245
246        Returns:
247            int: Value representing the total word count in the dataframe.
248        """
249        word_count = grouped_line_df.agg(sum(line_words_count_col_).alias("words_count"))
250        return word_count
251
252    def get_char_count(self, grouped_line_df:DataFrame, line_char_count_col_:str)->int:
253        """get_char_count Method that returns the total char count present in the dataframe.
254
255        Args:
256            grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
257            line_char_count_col_ (str): Column name that contains the char count of the various document lines.
258
259        Returns:
260            int: Value representing the total character count in the dataframe.
261        """
262        char_count = grouped_line_df.agg(sum(line_char_count_col_).alias("char_count"))
263        return char_count
264
265    def get_repeated_line_dist(self, line_df:DataFrame, id_col:str, text_col:str)->int:
266        """get_repeated_line_dist Method that returns the distance between the closest repeated lines.
267
268        Args:
269            line_df (DataFrame): Dataframe object containing the lines.
270            id_col (str): The column based on which the dataframe needs to be grouped by.
271            text_col (str): The column name for the text in the dataframe.
272
273        Returns:
274            int: Returns the distance between repeated lines
275        """
276        col_name = "repeated_line_dist"
277
278        repeated_line_dist = line_df.groupBy(id_col, text_col) \
279                                    .agg(count("*").alias(col_name)) \
280                                    .groupBy(id_col) \
281                                    .agg(collect_list(create_map([text_col, col_name])).alias(col_name)) \
282                                    .withColumn("keys", expr(f"transform({col_name}, x -> map_keys(x)[0])")) \
283                                    .withColumn("values", expr(f"transform({col_name}, x -> map_values(x)[0])")) \
284                                    .withColumn(col_name, map_from_arrays(col("keys"), col("values"))) \
285                                    .drop("keys", "values")
286
287        return repeated_line_dist
288
289    def run_analysis(
290        self,
291        line_df:DataFrame,
292        doc_id_col:str,
293        line_nsfw_count_col_:str,
294        line_non_li_count_col_:str,
295        line_bytes_col_:str,
296        line_words_count_col_:str,
297        line_char_count_col_:str,
298        only_base_stats:bool=False,
299    ) -> DataFrame:
300        """run_analysis Method that runs the analysis and aggregates the various stats for the dataframe.
301
302        Args:
303            line_df (DataFrame): Dataframe object containing the lines.
304            doc_id_col (str): The column based on which the dataframe needs to be grouped by.
305            line_nsfw_count_col_ (str): Column name that contains the nsfw word count of the various document lines.
306            line_non_li_count_col_ (str): Column name that contains the non latin-indic word count of the various document lines.
307            line_bytes_col_ (str): Column name that contains the byte count of the various document lines.
308            line_words_count_col_ (str): Column name that contains the word count of the various document lines.
309            line_char_count_col_ (str): Column name that contains the character count of the various document lines.
310            only_base_stats (bool, optional): If only return the basic statistic values. Defaults to False.
311
312        Returns:
313            DataFrame: Returns the dataframe with computed statistic values.
314        """
315        grouped_line_df = line_df.groupBy(doc_id_col)
316        bytes_df = self.get_bytes(grouped_line_df, line_bytes_col_)
317        words_count_df = self.get_words_count(grouped_line_df, line_words_count_col_)
318        char_count_df = self.get_char_count(grouped_line_df, line_char_count_col_)
319
320        doc_df = bytes_df \
321                    .join(words_count_df, [doc_id_col]) \
322                    .join(char_count_df, [doc_id_col])
323
324        if not only_base_stats:
325            num_lines_df = self.get_num_lines(grouped_line_df)
326            nsfw_words_count_df = self.get_nsfw_words_count(grouped_line_df, line_nsfw_count_col_)
327            non_li_words_count_df = self.get_non_li_words_count(grouped_line_df, line_non_li_count_col_)
328            mean_line_len_df = self.get_mean_line_length(grouped_line_df, "words_count")
329            min_line_len_df = self.get_min_line_length(grouped_line_df, "words_count")
330            max_line_len_df = self.get_max_line_length(grouped_line_df, "words_count")
331
332            doc_df = doc_df \
333                        .join(num_lines_df, [doc_id_col]) \
334                        .join(mean_line_len_df, [doc_id_col]) \
335                        .join(min_line_len_df, [doc_id_col]) \
336                        .join(max_line_len_df, [doc_id_col]) \
337                        .join(nsfw_words_count_df, [doc_id_col]) \
338                        .join(non_li_words_count_df, [doc_id_col])
339        
340        return doc_df
341
342    def run_flagging(
343        self, 
344        doc_df:DataFrame,
345        word_count_col:str,
346        char_count_col:str,
347        nsfw_count_col:str,
348        nsfw_threshold:float,
349        non_li_count_col:str, 
350        non_li_threshold:float,
351        min_line_count:int,
352        line_count_col:str,
353        min_mean_line_len:int,
354        mean_line_len_col:str,
355    )->DataFrame:
356        """run_flagging Method that executes the flagging stage based on computed document statistics.
357
358        Args:
359            doc_df (DataFrame): The dataframe object containing the various documents.
360            word_count_col (str): Column name that contains the word count of the various document lines.
361            char_count_col (str): Column name that contains the character word count of the various document lines.
362            nsfw_count_col (str): Column name that contains the nsfw word count of the various document lines.
363            nsfw_threshold (float): Threshold value for number of NSFW words acceptable.
364            non_li_count_col (str): Column name that contains the non latin-indic word count of the various document lines.
365            non_li_threshold (float): Threshold value for number of non latin-indic words.
366            min_line_count (int): Threshold value for minimum number of lines to constitute a document.
367            line_count_col (str): Column name that contains the line count of the various documents.
368            min_mean_line_len (int): Threshold value for the mean line length.
369            mean_line_len_col (str): Column name that contains the mean line length of the various document lines.
370
371        Returns:
372            DataFrame: _description_
373        """
374        doc_df = doc_df \
375                .select("*", when(doc_df[line_count_col] <= min_line_count, True).otherwise(False).alias("has_less_lines")) \
376                .select("*", when(doc_df[mean_line_len_col] <= min_mean_line_len, True).otherwise(False).alias("is_short_lines_heavy")) \
377                .select("*", when(doc_df[nsfw_count_col]/doc_df[word_count_col] >= nsfw_threshold, True).otherwise(False).alias("is_nsfw_heavy")) \
378                .select("*", when(doc_df[non_li_count_col]/doc_df[char_count_col] >= non_li_threshold, True).otherwise(False).alias("is_non_li_heavy"))
379        
380        return doc_df

SparkOptimizedHandlers The Class representation for the a handler object that provides utilities that manipulates the dataframe and provides with various statistics.

def get_num_lines(self, grouped_line_df: pyspark.sql.dataframe.DataFrame) -> int:
149    def get_num_lines(self, grouped_line_df:DataFrame)->int:
150        """get_num_lines Method that returns the number of lines present in the dataframe.
151
152        Args:
153            grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
154
155        Returns:
156            int: Value representing the number of lines.
157        """
158        lines_count = grouped_line_df.agg(count("*").alias("lines_count"))
159        return lines_count

get_num_lines Method that returns the number of lines present in the dataframe.

Arguments:
  • grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
Returns:

int: Value representing the number of lines.

def get_mean_line_length( self, grouped_line_df: pyspark.sql.dataframe.DataFrame, line_len_col_: str) -> int:
161    def get_mean_line_length(self, grouped_line_df:DataFrame, line_len_col_:str)->int:
162        """get_mean_line_length Method that returns the mean line length of all the lines present in the dataframe.
163
164        Args:
165            grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
166            line_len_col_ (str): Column that represents the line length in a document.
167
168        Returns:
169            int: Value representing the mean line length.
170        """
171        mean_line_lengths = grouped_line_df.agg(avg(line_len_col_).alias("mean_line_length"))
172        return mean_line_lengths

get_mean_line_length Method that returns the mean line length of all the lines present in the dataframe.

Arguments:
  • grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
  • line_len_col_ (str): Column that represents the line length in a document.
Returns:

int: Value representing the mean line length.

def get_min_line_length( self, grouped_line_df: pyspark.sql.dataframe.DataFrame, line_len_col_: str) -> int:
174    def get_min_line_length(self, grouped_line_df:DataFrame, line_len_col_:str)->int:
175        """get_min_line_length Method that returns the min line length of all the lines present in the dataframe.
176
177        Args:
178            grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
179            line_len_col_ (str): Column that represents the line length in a document.
180
181        Returns:
182            int: Value representing the min line length.
183        """
184        min_line_lengths_col = grouped_line_df.agg(min(line_len_col_).alias("min_line_length"))
185        return min_line_lengths_col

get_min_line_length Method that returns the min line length of all the lines present in the dataframe.

Arguments:
  • grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
  • line_len_col_ (str): Column that represents the line length in a document.
Returns:

int: Value representing the min line length.

def get_max_line_length( self, grouped_line_df: pyspark.sql.dataframe.DataFrame, line_len_col_: str) -> int:
187    def get_max_line_length(self, grouped_line_df:DataFrame, line_len_col_:str)->int:
188        """get_max_line_length Method that returns the max line length of all the lines present in the dataframe.
189
190        Args:
191            grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
192            line_len_col_ (str): Column name that contains the line length for the various document lines.
193
194        Returns:
195            int: Value representing the max line length.
196        """
197        max_line_lengths = grouped_line_df.agg(max(line_len_col_).alias("max_line_length"))
198        return max_line_lengths

get_max_line_length Method that returns the max line length of all the lines present in the dataframe.

Arguments:
  • grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
  • line_len_col_ (str): Column name that contains the line length for the various document lines.
Returns:

int: Value representing the max line length.

def get_nsfw_words_count( self, grouped_line_df: pyspark.sql.dataframe.DataFrame, line_nsfw_count_col_: str) -> int:
200    def get_nsfw_words_count(self, grouped_line_df:DataFrame, line_nsfw_count_col_:str)->int:
201        """get_nsfw_words_count Method that returns the number of NSFW words present in the dataframe.
202
203        Args:
204            grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
205            line_nsfw_count_col_ (str): Column name that contains the nsfw word count for the various document lines.
206
207        Returns:
208            int: Value representing the total nsfw word count.
209        """
210        nsfw_count = grouped_line_df.agg(sum(line_nsfw_count_col_).alias("nsfw_words_count"))
211        return nsfw_count

get_nsfw_words_count Method that returns the number of NSFW words present in the dataframe.

Arguments:
  • grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
  • line_nsfw_count_col_ (str): Column name that contains the nsfw word count for the various document lines.
Returns:

int: Value representing the total nsfw word count.

def get_non_li_words_count( self, grouped_line_df: pyspark.sql.dataframe.DataFrame, line_non_li_count_col_: str) -> int:
213    def get_non_li_words_count(self, grouped_line_df:DataFrame, line_non_li_count_col_:str)->int:
214        """get_non_li_words_count Method that returns the number of non latin-indic words in the dataframe.
215
216        Args:
217            grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
218            line_non_li_count_col_ (str): Column name that contains the non-li word count for the various document lines.
219
220        Returns:
221            int: Value representing the total non-latin indic word count.
222        """
223        non_li_count = grouped_line_df.agg(sum(line_non_li_count_col_).alias("non_li_char_count"))
224        return non_li_count

get_non_li_words_count Method that returns the number of non latin-indic words in the dataframe.

Arguments:
  • grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
  • line_non_li_count_col_ (str): Column name that contains the non-li word count for the various document lines.
Returns:

int: Value representing the total non-latin indic word count.

def get_bytes( self, grouped_line_df: pyspark.sql.dataframe.DataFrame, line_bytes_col_: str) -> int:
226    def get_bytes(self, grouped_line_df:DataFrame, line_bytes_col_:str)->int:
227        """get_bytes Method that returns the total bytes that represent the data present in the dataframe.
228
229        Args:
230            grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
231            line_bytes_col_ (str): Column name that contains the total bytes for the various document lines.
232
233        Returns:
234            int: Value representing the total bytes of data present in the dataframe.
235        """
236        bytes_ = grouped_line_df.agg(sum(line_bytes_col_).alias("bytes"))
237        return bytes_

get_bytes Method that returns the total bytes that represent the data present in the dataframe.

Arguments:
  • grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
  • line_bytes_col_ (str): Column name that contains the total bytes for the various document lines.
Returns:

int: Value representing the total bytes of data present in the dataframe.

def get_words_count( self, grouped_line_df: pyspark.sql.dataframe.DataFrame, line_words_count_col_: str) -> int:
239    def get_words_count(self, grouped_line_df:DataFrame, line_words_count_col_:str)->int:
240        """get_words_count Method that returns the total word count present in the dataframe.
241
242        Args:
243            grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
244            line_words_count_col_ (str): Column name that contains the word count of the various document lines.
245
246        Returns:
247            int: Value representing the total word count in the dataframe.
248        """
249        word_count = grouped_line_df.agg(sum(line_words_count_col_).alias("words_count"))
250        return word_count

get_words_count Method that returns the total word count present in the dataframe.

Arguments:
  • grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
  • line_words_count_col_ (str): Column name that contains the word count of the various document lines.
Returns:

int: Value representing the total word count in the dataframe.

def get_char_count( self, grouped_line_df: pyspark.sql.dataframe.DataFrame, line_char_count_col_: str) -> int:
252    def get_char_count(self, grouped_line_df:DataFrame, line_char_count_col_:str)->int:
253        """get_char_count Method that returns the total char count present in the dataframe.
254
255        Args:
256            grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
257            line_char_count_col_ (str): Column name that contains the char count of the various document lines.
258
259        Returns:
260            int: Value representing the total character count in the dataframe.
261        """
262        char_count = grouped_line_df.agg(sum(line_char_count_col_).alias("char_count"))
263        return char_count

get_char_count Method that returns the total char count present in the dataframe.

Arguments:
  • grouped_line_df (DataFrame): Dataframe object containing the grouped lines.
  • line_char_count_col_ (str): Column name that contains the char count of the various document lines.
Returns:

int: Value representing the total character count in the dataframe.

def get_repeated_line_dist( self, line_df: pyspark.sql.dataframe.DataFrame, id_col: str, text_col: str) -> int:
265    def get_repeated_line_dist(self, line_df:DataFrame, id_col:str, text_col:str)->int:
266        """get_repeated_line_dist Method that returns the distance between the closest repeated lines.
267
268        Args:
269            line_df (DataFrame): Dataframe object containing the lines.
270            id_col (str): The column based on which the dataframe needs to be grouped by.
271            text_col (str): The column name for the text in the dataframe.
272
273        Returns:
274            int: Returns the distance between repeated lines
275        """
276        col_name = "repeated_line_dist"
277
278        repeated_line_dist = line_df.groupBy(id_col, text_col) \
279                                    .agg(count("*").alias(col_name)) \
280                                    .groupBy(id_col) \
281                                    .agg(collect_list(create_map([text_col, col_name])).alias(col_name)) \
282                                    .withColumn("keys", expr(f"transform({col_name}, x -> map_keys(x)[0])")) \
283                                    .withColumn("values", expr(f"transform({col_name}, x -> map_values(x)[0])")) \
284                                    .withColumn(col_name, map_from_arrays(col("keys"), col("values"))) \
285                                    .drop("keys", "values")
286
287        return repeated_line_dist

get_repeated_line_dist Method that returns the distance between the closest repeated lines.

Arguments:
  • line_df (DataFrame): Dataframe object containing the lines.
  • id_col (str): The column based on which the dataframe needs to be grouped by.
  • text_col (str): The column name for the text in the dataframe.
Returns:

int: Returns the distance between repeated lines

def run_analysis( self, line_df: pyspark.sql.dataframe.DataFrame, doc_id_col: str, line_nsfw_count_col_: str, line_non_li_count_col_: str, line_bytes_col_: str, line_words_count_col_: str, line_char_count_col_: str, only_base_stats: bool = False) -> pyspark.sql.dataframe.DataFrame:
289    def run_analysis(
290        self,
291        line_df:DataFrame,
292        doc_id_col:str,
293        line_nsfw_count_col_:str,
294        line_non_li_count_col_:str,
295        line_bytes_col_:str,
296        line_words_count_col_:str,
297        line_char_count_col_:str,
298        only_base_stats:bool=False,
299    ) -> DataFrame:
300        """run_analysis Method that runs the analysis and aggregates the various stats for the dataframe.
301
302        Args:
303            line_df (DataFrame): Dataframe object containing the lines.
304            doc_id_col (str): The column based on which the dataframe needs to be grouped by.
305            line_nsfw_count_col_ (str): Column name that contains the nsfw word count of the various document lines.
306            line_non_li_count_col_ (str): Column name that contains the non latin-indic word count of the various document lines.
307            line_bytes_col_ (str): Column name that contains the byte count of the various document lines.
308            line_words_count_col_ (str): Column name that contains the word count of the various document lines.
309            line_char_count_col_ (str): Column name that contains the character count of the various document lines.
310            only_base_stats (bool, optional): If only return the basic statistic values. Defaults to False.
311
312        Returns:
313            DataFrame: Returns the dataframe with computed statistic values.
314        """
315        grouped_line_df = line_df.groupBy(doc_id_col)
316        bytes_df = self.get_bytes(grouped_line_df, line_bytes_col_)
317        words_count_df = self.get_words_count(grouped_line_df, line_words_count_col_)
318        char_count_df = self.get_char_count(grouped_line_df, line_char_count_col_)
319
320        doc_df = bytes_df \
321                    .join(words_count_df, [doc_id_col]) \
322                    .join(char_count_df, [doc_id_col])
323
324        if not only_base_stats:
325            num_lines_df = self.get_num_lines(grouped_line_df)
326            nsfw_words_count_df = self.get_nsfw_words_count(grouped_line_df, line_nsfw_count_col_)
327            non_li_words_count_df = self.get_non_li_words_count(grouped_line_df, line_non_li_count_col_)
328            mean_line_len_df = self.get_mean_line_length(grouped_line_df, "words_count")
329            min_line_len_df = self.get_min_line_length(grouped_line_df, "words_count")
330            max_line_len_df = self.get_max_line_length(grouped_line_df, "words_count")
331
332            doc_df = doc_df \
333                        .join(num_lines_df, [doc_id_col]) \
334                        .join(mean_line_len_df, [doc_id_col]) \
335                        .join(min_line_len_df, [doc_id_col]) \
336                        .join(max_line_len_df, [doc_id_col]) \
337                        .join(nsfw_words_count_df, [doc_id_col]) \
338                        .join(non_li_words_count_df, [doc_id_col])
339        
340        return doc_df

run_analysis Method that runs the analysis and aggregates the various stats for the dataframe.

Arguments:
  • line_df (DataFrame): Dataframe object containing the lines.
  • doc_id_col (str): The column based on which the dataframe needs to be grouped by.
  • line_nsfw_count_col_ (str): Column name that contains the nsfw word count of the various document lines.
  • line_non_li_count_col_ (str): Column name that contains the non latin-indic word count of the various document lines.
  • line_bytes_col_ (str): Column name that contains the byte count of the various document lines.
  • line_words_count_col_ (str): Column name that contains the word count of the various document lines.
  • line_char_count_col_ (str): Column name that contains the character count of the various document lines.
  • only_base_stats (bool, optional): If only return the basic statistic values. Defaults to False.
Returns:

DataFrame: Returns the dataframe with computed statistic values.

def run_flagging( self, doc_df: pyspark.sql.dataframe.DataFrame, word_count_col: str, char_count_col: str, nsfw_count_col: str, nsfw_threshold: float, non_li_count_col: str, non_li_threshold: float, min_line_count: int, line_count_col: str, min_mean_line_len: int, mean_line_len_col: str) -> pyspark.sql.dataframe.DataFrame:
342    def run_flagging(
343        self, 
344        doc_df:DataFrame,
345        word_count_col:str,
346        char_count_col:str,
347        nsfw_count_col:str,
348        nsfw_threshold:float,
349        non_li_count_col:str, 
350        non_li_threshold:float,
351        min_line_count:int,
352        line_count_col:str,
353        min_mean_line_len:int,
354        mean_line_len_col:str,
355    )->DataFrame:
356        """run_flagging Method that executes the flagging stage based on computed document statistics.
357
358        Args:
359            doc_df (DataFrame): The dataframe object containing the various documents.
360            word_count_col (str): Column name that contains the word count of the various document lines.
361            char_count_col (str): Column name that contains the character word count of the various document lines.
362            nsfw_count_col (str): Column name that contains the nsfw word count of the various document lines.
363            nsfw_threshold (float): Threshold value for number of NSFW words acceptable.
364            non_li_count_col (str): Column name that contains the non latin-indic word count of the various document lines.
365            non_li_threshold (float): Threshold value for number of non latin-indic words.
366            min_line_count (int): Threshold value for minimum number of lines to constitute a document.
367            line_count_col (str): Column name that contains the line count of the various documents.
368            min_mean_line_len (int): Threshold value for the mean line length.
369            mean_line_len_col (str): Column name that contains the mean line length of the various document lines.
370
371        Returns:
372            DataFrame: _description_
373        """
374        doc_df = doc_df \
375                .select("*", when(doc_df[line_count_col] <= min_line_count, True).otherwise(False).alias("has_less_lines")) \
376                .select("*", when(doc_df[mean_line_len_col] <= min_mean_line_len, True).otherwise(False).alias("is_short_lines_heavy")) \
377                .select("*", when(doc_df[nsfw_count_col]/doc_df[word_count_col] >= nsfw_threshold, True).otherwise(False).alias("is_nsfw_heavy")) \
378                .select("*", when(doc_df[non_li_count_col]/doc_df[char_count_col] >= non_li_threshold, True).otherwise(False).alias("is_non_li_heavy"))
379        
380        return doc_df

run_flagging Method that executes the flagging stage based on computed document statistics.

Arguments:
  • doc_df (DataFrame): The dataframe object containing the various documents.
  • word_count_col (str): Column name that contains the word count of the various document lines.
  • char_count_col (str): Column name that contains the character word count of the various document lines.
  • nsfw_count_col (str): Column name that contains the nsfw word count of the various document lines.
  • nsfw_threshold (float): Threshold value for number of NSFW words acceptable.
  • non_li_count_col (str): Column name that contains the non latin-indic word count of the various document lines.
  • non_li_threshold (float): Threshold value for number of non latin-indic words.
  • min_line_count (int): Threshold value for minimum number of lines to constitute a document.
  • line_count_col (str): Column name that contains the line count of the various documents.
  • min_mean_line_len (int): Threshold value for the mean line length.
  • mean_line_len_col (str): Column name that contains the mean line length of the various document lines.
Returns:

DataFrame: _description_