diff --git a/README.md b/README.md index 68a1964..805163c 100644 --- a/README.md +++ b/README.md @@ -536,8 +536,8 @@ Only a few of the functions are covered by unit tests. Functions involving closu | acos | ![done] | | | acosh | ![done] | | | add_months | ![done] | | -| aes_decrypt | ![open] | | -| aes_encrypt | ![open] | | +| aes_decrypt | ![done] | | +| aes_encrypt | ![done] | | | aggregate | ![open] | | | any_value | ![done] | | | approx_count_distinct | ![done] | | diff --git a/crates/connect/src/functions/mod.rs b/crates/connect/src/functions/mod.rs index f0ff58b..3fc58cb 100644 --- a/crates/connect/src/functions/mod.rs +++ b/crates/connect/src/functions/mod.rs @@ -1,5 +1,7 @@ //! A re-implementation of Spark functions +use std::collections::HashMap; + use crate::expressions::VecExpression; use crate::spark; use crate::DataFrame; @@ -58,6 +60,20 @@ macro_rules! gen_func { }; } +pub(crate) fn options_to_map(cols: I) -> Column +where + I: IntoIterator, + K: AsRef, + V: AsRef, +{ + let map: Vec = cols + .into_iter() + .flat_map(|(k, v)| vec![lit(k.as_ref()), lit(v.as_ref())]) + .collect(); + + create_map(map) +} + // Normal Functions /// Returns a [Column] based on the given column name. @@ -189,8 +205,8 @@ pub fn negate(col: impl Into) -> Column { gen_func!(pi, [], "Returns Pi."); gen_func!(pmod, [dividend: Column, divisor: Column], "Returns the positive value of dividend mod divisor."); - gen_func!(power, [col1: Column, col2: Column], "Returns the value of the first argument raised to the power of the second argument."); +gen_func!(positive, [col: Column], "Returns the value."); /// Returns the value of the first argument raised to the power of the second argument. pub fn pow(col1: impl Into, col2: impl Into) -> Column { @@ -214,13 +230,20 @@ pub fn bround(col: impl Into, scale: Option) -> Column { gen_func!(sec, [col: Column], "Computes secant of the input column."); gen_func!(shiftleft, [col: Column, num_bits: Column], "Shift the given value numBits left."); gen_func!(shiftright, [col: Column, num_bits: Column], "(Signed) shift the given value numBits right."); -gen_func!(shiftrightunsighed, [col: Column, num_bits: Column], "(Signed) shift the given value numBits right."); +gen_func!(shiftrightunsigned, [col: Column, num_bits: Column], "(Signed) shift the given value numBits right."); gen_func!(sign, [col: Column], "Computes the signum of the given value."); gen_func!(signum, [col: Column], "Computes the signum of the given value."); gen_func!(sin, [col: Column], "Computes sine of the input column."); gen_func!(sinh, [col: Column], "Computes hyperbolic sine of the input column."); gen_func!(tan, [col: Column], "Computes tangent of the input column."); gen_func!(tanh, [col: Column], "Computes hyperbolic tangent of the input column."); +gen_func!(try_add, [left: Column, right: Column], "Returns the sum of left and right and the result is null on overflow."); +gen_func!(try_avg, [col: Column], "Returns the mean calculated from values of a group and the result is null on overflow."); +gen_func!(try_divide, [left: Column, right: Column], "Returns dividend/divisor."); +gen_func!(try_multiply, [left: Column, right: Column], "Returns left*right and the result is null on overflow."); +gen_func!(try_subtract, [left: Column, right: Column], "Returns left-right and the result is null on overflow."); +gen_func!(try_sum, [left: Column, right: Column], "Returns sum calculated from values of a group and the result is null on overflow."); + gen_func!(degrees, [col: Column], "Converts an angle measured in radians to an approximately equivalent angle measured in degrees."); gen_func!(radians, [col: Column], "Converts an angle measured in degrees to an approximately equivalent angle measured in radians."); @@ -593,26 +616,58 @@ gen_func!(from_utc_timestamp, [timestamp: Column, tz: Column], "This is a common gen_func!(to_utc_timestamp, [timestamp: Column, tz: Column], "This is a common function for databases supporting TIMESTAMP WITHOUT TIMEZONE."); gen_func!(weekday, [col: Column], "Returns the day of the week for date/timestamp (0 = Monday, 1 = Tuesday, …, 6 = Sunday)."); -#[allow(dead_code)] -#[allow(unused_variables)] -fn window(col: Column) -> Column { - unimplemented!() -} +/// Bucketize rows into one or moe time windows given a timestamp specifying column. +pub fn window( + time_column: impl Into, + window_duration: &str, + slide_duration: Option<&str>, + start_time: Option<&str>, +) -> Column { + let window_duration = lit(window_duration); -#[allow(dead_code)] -#[allow(unused_variables)] -fn session_window(col: Column) -> Column { - unimplemented!() + if slide_duration.is_some() & start_time.is_some() { + invoke_func( + "window", + vec![ + time_column.into(), + window_duration, + lit(slide_duration.unwrap()), + lit(start_time.unwrap()), + ], + ) + } else if slide_duration.is_some() & start_time.is_none() { + invoke_func( + "window", + vec![ + time_column.into(), + window_duration, + lit(slide_duration.unwrap()), + ], + ) + } else if slide_duration.is_none() & start_time.is_some() { + invoke_func( + "window", + vec![ + time_column.into(), + window_duration, + lit(start_time.unwrap()), + ], + ) + } else { + invoke_func("window", vec![time_column.into(), window_duration]) + } } +gen_func!(session_window, [time_column: Column, gap_duration: Column], "Generates session window given a timestamp specifying column."); gen_func!(timestamp_micros, [col: Column], "Creates timestamp from the number of microseconds since UTC epoch."); gen_func!(timestamp_millis, [col: Column], "Creates timestamp from the number of milliseconds since UTC epoch."); gen_func!(timestamp_seconds, [col: Column], "Converts the number of seconds from the Unix epoch (1970-01-01T00:00:00Z) to a timestamp."); -#[allow(dead_code)] -#[allow(unused_variables)] -fn try_to_timestamp(col: Column) -> Column { - unimplemented!() +pub fn try_to_timestamp(col: impl Into, format: Option>) -> Column { + match format { + Some(val) => invoke_func("try_to_timestamp", vec![col.into(), val.into()]), + None => invoke_func("try_to_timestamp", vec![col.into()]), + } } gen_func!(unix_date, [col: Column], "Returns the number of days since 1970-01-01."); @@ -642,7 +697,15 @@ pub fn array_join( } } -gen_func!(create_map, [cols: _], "Creates a new map column."); +/// Create a new map column. +pub fn create_map(cols: I) -> Column +where + I: IntoIterator, + I::Item: Into, +{ + invoke_func("map", cols) +} + gen_func!(slice, [x: Column, start: Column, length: Column], "Returns an array containing all the elements in x from index start (array indices start at 1, or from the end if start is negative) with the specified length."); gen_func!(concat, [cols: _], "Concatenates multiple input columns together into a single column."); @@ -696,6 +759,38 @@ where invoke_func("json_tuple", args) } +/// Parses a column containing a JSON string to a row with the specific schema. +/// Returns null in the case of an unparseable string +pub fn from_json( + col: impl Into, + schema: impl Into, + options: Option>, +) -> Column { + match options { + Some(opts) => invoke_func( + "from_json", + vec![col.into(), schema.into(), options_to_map(opts)], + ), + None => invoke_func("from_json", vec![col.into(), schema.into()]), + } +} + +/// Parses a JSON string and infers its schema in DDL format +pub fn schema_of_json(json: impl Into, options: Option>) -> Column { + match options { + Some(opts) => invoke_func("schema_of_json", vec![json.into(), options_to_map(opts)]), + None => invoke_func("schema_of_json", vec![json.into()]), + } +} + +/// Converts a column containing a StructType into a JSON string +pub fn to_json(col: impl Into, options: Option>) -> Column { + match options { + Some(opts) => invoke_func("to_json", vec![col.into(), options_to_map(opts)]), + None => invoke_func("to_json", vec![col.into()]), + } +} + gen_func!(json_array_length, [col: Column], "Returns the number of elements in the outermost JSON array."); gen_func!(json_object_keys, [col: Column], "Returns all the keys of the outermost JSON object as an array."); gen_func!(size, [col: Column], "Returns the length of the array or map stored in the column."); @@ -744,10 +839,54 @@ gen_func!(map_entries, [col: Column], "Returns an unordered array of all entries gen_func!(map_from_entries, [col: Column], "Converts an array of entries (key value struct types) to a map of values."); gen_func!(arrays_zip, [cols: _], "Returns a merged array of structs in which the N-th struct contains all N-th values of input arrays."); gen_func!(map_concat, [cols: _], "Returns the union of all the given maps."); + +/// Parses a column containing a CSV string to a row with the specific schema. +/// Returns null in the case of an unparseable string +pub fn from_csv( + col: impl Into, + schema: impl Into, + options: Option>, +) -> Column { + match options { + Some(opts) => invoke_func( + "from_csv", + vec![col.into(), schema.into(), options_to_map(opts)], + ), + None => invoke_func("from_csv", vec![col.into(), schema.into()]), + } +} + +/// Parses a CSV string and infers its schema in DDL format +pub fn schema_of_csv(csv: impl Into, options: Option>) -> Column { + match options { + Some(opts) => invoke_func("schema_of_csv", vec![csv.into(), options_to_map(opts)]), + None => invoke_func("schema_of_csv", vec![csv.into()]), + } +} + +/// Create a map after splitting the text into key/value pairs using delimiters +pub fn str_to_map( + text: impl Into, + pair_delim: Option>, + key_value_delim: Option>, +) -> Column { + let pair_delim = pair_delim.map(Into::into).unwrap_or_else(|| lit(",")); + let key_value_delim = key_value_delim.map(Into::into).unwrap_or_else(|| lit(":")); + + invoke_func("str_to_map", vec![text.into(), pair_delim, key_value_delim]) +} + +/// Converts a column containing a StructType into a CSV string +pub fn to_csv(col: impl Into, options: Option>) -> Column { + match options { + Some(opts) => invoke_func("to_csv", vec![col.into(), options_to_map(opts)]), + None => invoke_func("to_csv", vec![col.into()]), + } +} + gen_func!(try_element_at, [col: Column, extraction: Column], "Returns element of array at given (1-based) index."); // Partition Transformations - gen_func!(years, [col: Column], "A transform for timestamps and dates to partition data into years."); gen_func!(months, [col: Column], "A transform for timestamps and dates to partition data into months."); gen_func!(days, [col: Column], "A transform for timestamps and dates to partition data into days."); @@ -785,6 +924,35 @@ gen_func!(collect_list, [col: Column], "Returns a list of objects with duplicate gen_func!(corr, [col1: Column, col2: Column], "Returns a new Column for the Pearson Correlation Coefficient for col1 and col2."); +gen_func!(count, [col: Column], "Returns the number of items in a group."); + +// Returns a new Column for distinct count of col or cols +pub fn count_distinct(col: impl Into, cols: Option) -> Column +where + I: IntoIterator, + I::Item: Into, +{ + let mut cols = match cols { + Some(val) => VecExpression::from_iter(val).expr, + None => vec![], + }; + + let mut expr = vec![col.into().expression]; + + expr.append(&mut cols); + + Column::from(spark::Expression { + expr_type: Some(spark::expression::ExprType::UnresolvedFunction( + spark::expression::UnresolvedFunction { + function_name: "count".to_string(), + arguments: VecExpression::from_iter(expr).into(), + is_distinct: true, + is_user_defined_function: false, + }, + )), + }) +} + /// Returns a count-min sketch of a column with the given esp, confidence and seed. pub fn count_min_sketch( col: impl Into, @@ -941,6 +1109,47 @@ gen_func!( "Returns the rank of rows within a window partition, without any gaps" ); +/// Returns the value that os offset rows before the current row, and default is there is less +/// than offset rows before the current row +pub fn lag( + col: impl Into, + offset: Option>, + default: Option>, +) -> Column { + let offset = offset.map(Into::into).unwrap_or_else(|| lit(1)); + + match default { + Some(val) => invoke_func("lag", vec![col.into(), offset, val.into()]), + None => invoke_func("lag", vec![col.into(), offset]), + } +} + +/// Returns the value that os offset rows after the current row, and default is there is less +/// than offset rows after the current row +pub fn lead( + col: impl Into, + offset: Option>, + default: Option>, +) -> Column { + let offset = offset.map(Into::into).unwrap_or_else(|| lit(1)); + + match default { + Some(val) => invoke_func("lead", vec![col.into(), offset, val.into()]), + None => invoke_func("lead", vec![col.into(), offset]), + } +} + +pub fn nth_value( + col: impl Into, + offset: impl Into, + ignore_nulls: Option, +) -> Column { + match ignore_nulls { + Some(val) => invoke_func("nth_value", vec![col.into(), offset.into(), lit(val)]), + None => invoke_func("nth_value", vec![col.into(), offset.into()]), + } +} + /// Returns the ntile group id (from 1 to n inclusive) in an ordered window partition. pub fn ntile(n: i32) -> Column { invoke_func("ntitle", vec![lit(n)]) @@ -994,9 +1203,33 @@ pub fn desc_nulls_last(col: impl Into) -> Column { gen_func!(ascii, [col: Column], "Computes the numeric value of the first character of the string column."); gen_func!(base64, [col: Column], "Computes the BASE64 encoding of a binary column and returns it as a string column."); gen_func!(bit_length, [col: Column], "Calculates the bit length for the specified string column."); + +// Remove the leading and trailing *trim* characters from *str* +pub fn btrim(str: impl Into, trim: Option>) -> Column { + match trim { + Some(val) => invoke_func("btrim", vec![str.into(), val.into()]), + None => invoke_func("btrim", vec![str.into()]), + } +} + gen_func!(char, [col: Column], "Returns the ASCII character having the binary equivalent to col."); gen_func!(character_length, [str: Column], "Returns the character length of string data or number of bytes of binary data."); gen_func!(char_length, [str: Column], "Returns the character length of string data or number of bytes of binary data."); + +// Concatenates multiple input string columns together into a single string column, using the given +// separator +pub fn concat_ws(sep: &str, cols: I) -> Column +where + I: IntoIterator, + I::Item: Into, +{ + let mut cols = VecExpression::from_iter(cols).expr; + let mut expr = vec![lit(sep).expression]; + expr.append(&mut cols); + + invoke_func("concat_ws", expr) +} + gen_func!(contains, [left: Column, right: Column], "Returns a boolean."); gen_func!(decode, [col: Column, charset: Column], "Computes the first argument into a string from a binary using the provided character set (one of ‘US-ASCII’, ‘ISO-8859-1’, ‘UTF-8’, ‘UTF-16BE’, ‘UTF-16LE’, ‘UTF-16’)."); gen_func!(elt, [cols: _], "Returns the n-th input, e.g., returns input2 when n is 2."); @@ -1004,24 +1237,177 @@ gen_func!(encode, [col: Column, charset: Column], "Computes the first argument i gen_func!(endswith, [str: Column, suffix: Column], "Returns a boolean."); gen_func!(find_in_set, [str: Column, str_array: Column], "Returns the index (1-based) of the given string (str) in the comma-delimited list (strArray)."); gen_func!(format_number, [col: Column, d: Column], "Formats the number X to a format like ‘#,–#,–#.–’, rounded to d decimal places with HALF_EVEN round mode, and returns the result as a string."); + +// Formats the arguments in printf-style and returns the result as a string column +pub fn format_string(format: &str, cols: I) -> Column +where + I: IntoIterator, + I::Item: Into, +{ + let mut cols = VecExpression::from_iter(cols).expr; + let mut expr = vec![lit(format).expression]; + expr.append(&mut cols); + + invoke_func("format_string", expr) +} + +pub fn ilike( + str: impl Into, + pattern: impl Into, + escape_char: Option, +) -> Column { + match escape_char { + Some(val) => invoke_func("ilike", vec![str.into(), pattern.into(), val]), + None => invoke_func("ilike", vec![str.into(), pattern.into()]), + } +} + gen_func!(initcap, [col: Column], "Translate the first letter of each word to upper case in the sentence."); gen_func!(instr, [str: Column, substr: Column], "Locate the position of the first occurrence of substr column in the given string."); gen_func!(lcase, [str: Column], "Returns str with all characters changed to lowercase."); gen_func!(length, [col: Column], "Computes the character length of string data or number of bytes of binary data."); + +pub fn like( + str: impl Into, + pattern: impl Into, + escape_char: Option, +) -> Column { + match escape_char { + Some(val) => invoke_func("like", vec![str.into(), pattern.into(), val]), + None => invoke_func("like", vec![str.into(), pattern.into()]), + } +} + gen_func!(lower, [col: Column], "Converts a string expression to lower case."); gen_func!(left, [str: Column, len: Column], "Returns the leftmost len`(`len can be string type) characters from the string str, if len is less or equal than 0 the result is an empty string."); + +pub fn levenshtein( + left: impl Into, + right: impl Into, + threshold: Option, +) -> Column { + match threshold { + Some(val) => invoke_func("levenshtein", vec![left.into(), right.into(), lit(val)]), + None => invoke_func("levenshtein", vec![left.into(), right.into()]), + } +} + +pub fn locate(substr: impl Into, str: impl Into, pos: Option) -> Column { + match pos { + Some(val) => invoke_func("locate", vec![substr.into(), str.into(), lit(val)]), + None => invoke_func("locate", vec![substr.into(), str.into(), lit(1)]), + } +} + gen_func!(lpad, [col: Column, len: Column, pad: Column], "Left-pad the string column to width len with pad."); gen_func!(ltrim, [col: Column], "Trim the spaces from left end for the specified string value."); +// pub fn mask( +// col: impl Into, +// upper_char: Option>, +// lower_char: Option>, +// digit_char: Option>, +// other_char: Option>, +// ) -> Column { +// let upper_char = upper_char.map(Into::into).unwrap_or_else(|| lit("X")); +// let lower_char = lower_char.map(Into::into).unwrap_or_else(|| lit("x")); +// let digit_char = digit_char.map(Into::into).unwrap_or_else(|| lit("n")); +// let other_char = upper_char.map(Into::into).unwrap_or_else(|| lit(None)); +// +// invoke_func( +// "mask", +// vec![col.into(), upper_char, lower_char, digit_char, other_char], +// ) +// } + gen_func!(octet_length, [col: Column], "Calculates the byte length for the specified string column."); +pub fn parse_url( + url: impl Into, + part_to_extract: impl Into, + key: Option>, +) -> Column { + match key { + Some(val) => invoke_func( + "parse_url", + vec![url.into(), part_to_extract.into(), val.into()], + ), + None => invoke_func("parse_url", vec![url.into(), part_to_extract.into()]), + } +} + +pub fn position( + substr: impl Into, + str: impl Into, + start: Option>, +) -> Column { + match start { + Some(val) => invoke_func("position", vec![substr.into(), str.into(), val.into()]), + None => invoke_func("position", vec![substr.into(), str.into()]), + } +} + +pub fn printf(format: impl Into, cols: I) -> Column +where + I: IntoIterator, + I::Item: Into, +{ + let mut cols = VecExpression::from_iter(cols).expr; + let mut expr = vec![format.into().expression]; + expr.append(&mut cols); + + invoke_func("printf", expr) +} + gen_func!(rlike, [str: Column, regexp: Column], "Returns true if str matches the Java regex regexp, or false otherwise."); gen_func!(regexp, [str: Column, regexp: Column], "Returns true if str matches the Java regex regexp, or false otherwise."); gen_func!(regexp_like, [str: Column, regexp: Column], "Returns true if str matches the Java regex regexp, or false otherwise."); + +pub fn regexp_extract(str: impl Into, pattern: &str, idx: i32) -> Column { + invoke_func("regexp_extract", vec![str.into(), lit(pattern), lit(idx)]) +} + +pub fn regexp_extract_all( + str: impl Into, + regexp: impl Into, + idx: Option>, +) -> Column { + match idx { + Some(val) => invoke_func( + "regexp_extract_all", + vec![str.into(), regexp.into(), val.into()], + ), + None => invoke_func("regexp_extract_all", vec![str.into(), regexp.into()]), + } +} + +pub fn regexp_instr( + str: impl Into, + regexp: impl Into, + idx: Option>, +) -> Column { + match idx { + Some(val) => invoke_func("regexp_instr", vec![str.into(), regexp.into(), val.into()]), + None => invoke_func("regexp_instr", vec![str.into(), regexp.into()]), + } +} + gen_func!(regexp_count, [str: Column, regexp: Column], "Returns a count of the number of times that the Java regex pattern regexp is matched in the string str."); +gen_func!(regexp_replace, [string: Column, pattern: Column, replacement: Column], "Replace all substrings of the specified string value that match regexp with replacement"); gen_func!(regexp_substr, [str: Column, regexp: Column], "Returns the substring that matches the Java regex regexp within the string str."); +pub fn replace( + src: impl Into, + search: impl Into, + replace: Option>, +) -> Column { + match replace { + Some(val) => invoke_func("replace", vec![src.into(), search.into(), val.into()]), + None => invoke_func("replace", vec![src.into(), search.into()]), + } +} + gen_func!(right, [str: Column, len: Column], "Returns the rightmost len`(`len can be string type) characters from the string str, if len is less or equal than 0 the result is an empty string."); gen_func!(ucase, [str: Column], "Returns str with all characters changed to uppercase."); gen_func!(unbase64, [col: Column], "Decodes a BASE64 encoded string column and returns it as a binary column."); @@ -1029,13 +1415,78 @@ gen_func!(rpad, [col: Column, len: Column, pad: Column], "Right-pad the string c gen_func!(repeat, [col: Column, n: Column], "Repeats a string column n times, and returns it as a new string column."); gen_func!(rtrim, [col: Column], "Trim the spaces from right end for the specified string value."); gen_func!(soundex, [col: Column], "Returns the SoundEx encoding for a string"); + +/// Splits str around matches of the given pattern. +pub fn split(str: impl Into, pattern: &str, limit: Option) -> Column { + let values = vec![str.into(), lit(pattern), lit(limit.unwrap_or(-1)).clone()]; + invoke_func("split", values) +} + gen_func!(split_part, [src: Column, delimiter: Column, part_num: Column], "Splits str by delimiter and return requested part of the split (1-based)."); gen_func!(startswith, [str: Column, prefix: Column], "Returns a boolean."); + +pub fn substr( + str: impl Into, + pos: impl Into, + len: Option>, +) -> Column { + match len { + Some(val) => invoke_func("substr", vec![str.into(), pos.into(), val.into()]), + None => invoke_func("substr", vec![str.into(), pos.into()]), + } +} + gen_func!(substring, [src: Column, pos: Column, len: Column], "Substring starts at pos and is of length len when str is String type or returns the slice of byte array that starts at pos in byte and is of length len when str is Binary type."); gen_func!(substring_index, [src: Column, delim: Column, count: Column], "Returns the substring from string str before count occurrences of the delimiter delim."); -gen_func!(to_char, [col: Column, formant: Column], "Convert col to a string based on the format."); -gen_func!(to_number, [col: Column, formant: Column], "Convert string ‘col’ to a number based on the string format ‘format’"); -gen_func!(to_varchar, [col: Column, formant: Column], "Convert col to a string based on the format."); + +pub fn overlay( + src: impl Into, + replace: impl Into, + pos: impl Into, + ignore_nulls: Option>, +) -> Column { + match ignore_nulls { + Some(val) => invoke_func( + "overlay", + vec![src.into(), replace.into(), pos.into(), val.into()], + ), + None => invoke_func( + "overlay", + vec![src.into(), replace.into(), pos.into(), lit(-1)], + ), + } +} + +/// Splits a string into arrays of sentences, where each sentence is an array of words +pub fn sentences( + string: impl Into, + language: Option>, + country: Option>, +) -> Column { + let language = language.map(Into::into).unwrap_or_else(|| lit("")); + let country = country.map(Into::into).unwrap_or_else(|| lit("")); + + invoke_func("sentences", vec![string.into(), language, country]) +} + +pub fn to_binary(col: impl Into, len: Option>) -> Column { + match len { + Some(val) => invoke_func("to_binary", vec![col.into(), val.into()]), + None => invoke_func("to_binary", vec![col.into()]), + } +} + +pub fn try_to_binary(col: impl Into, len: Option>) -> Column { + match len { + Some(val) => invoke_func("try_to_binary", vec![col.into(), val.into()]), + None => invoke_func("try_to_binary", vec![col.into()]), + } +} + +gen_func!(to_char, [col: Column, format: Column], "Convert col to a string based on the format."); +gen_func!(to_number, [col: Column, format: Column], "Convert string ‘col’ to a number based on the string format ‘format’"); +gen_func!(try_to_number, [col: Column, format: Column], "Convert string ‘col’ to a number based on the string format ‘format’"); +gen_func!(to_varchar, [col: Column, format: Column], "Convert col to a string based on the format."); gen_func!(translate, [src_col: Column, matching: Column, replace: Column], "A function translate any character in the srcCol by a character in matching."); gen_func!(trim, [col: Column], "Trim the spaces from both ends for the specified string column."); gen_func!(upper, [col: Column], "Converts a string expression to upper case."); @@ -1049,6 +1500,62 @@ gen_func!(bit_get, [col: Column, pos: Column], "Returns the value of the bit (0 gen_func!(getbit, [col: Column, pos: Column], "Returns the value of the bit (0 or 1) at the specified position."); // misc functions + +/// Returns a decrypted value of *input* using AES in *mode* with *padding* +pub fn aes_decrypt( + input: impl Into, + key: impl Into, + mode: Option>, + padding: Option>, + aad: Option>, +) -> Column { + let mode = mode.map(Into::into).unwrap_or_else(|| lit("GCM")); + let padding = padding.map(Into::into).unwrap_or_else(|| lit("DEFAULT")); + let aad = aad.map(Into::into).unwrap_or_else(|| lit("")); + + invoke_func( + "aes_decrypt", + vec![input.into(), key.into(), mode, padding, aad], + ) +} + +pub fn try_aes_decrypt( + input: impl Into, + key: impl Into, + mode: Option>, + padding: Option>, + aad: Option>, +) -> Column { + let mode = mode.map(Into::into).unwrap_or_else(|| lit("GCM")); + let padding = padding.map(Into::into).unwrap_or_else(|| lit("DEFAULT")); + let aad = aad.map(Into::into).unwrap_or_else(|| lit("")); + + invoke_func( + "try_aes_decrypt", + vec![input.into(), key.into(), mode, padding, aad], + ) +} + +/// Returns a encrypted value of *input* using AES in *mode* with the specified *padding* +pub fn aes_encrypt( + input: impl Into, + key: impl Into, + mode: Option>, + padding: Option>, + iv: Option>, + aad: Option>, +) -> Column { + let mode = mode.map(Into::into).unwrap_or_else(|| lit("GCM")); + let padding = padding.map(Into::into).unwrap_or_else(|| lit("DEFAULT")); + let iv = iv.map(Into::into).unwrap_or_else(|| lit("")); + let aad = aad.map(Into::into).unwrap_or_else(|| lit("")); + + invoke_func( + "aes_encrypt", + vec![input.into(), key.into(), mode, padding, iv, aad], + ) +} + gen_func!(bitmap_bit_position, [col: Column], "Returns the bit position for the given input column."); gen_func!(bitmap_bucket_number, [col: Column], "Returns the bucket number for the given input column."); gen_func!(bitmap_construct_agg, [col: Column], "Returns a bitmap with the positions of the bits set from all the values from the input column."); @@ -1075,9 +1582,31 @@ gen_func!(sha2, [col: Column, num_bits: Column], "Returns the hex string result gen_func!(crc32, [col: Column], "Calculates the cyclic redundancy check value (CRC32) of a binary column and returns the value as a bigint."); gen_func!(hash, [cols: _], "Calculates the hash code of given columns, and returns the result as an int column."); gen_func!(xxhash64, [cols: _], "Calculates the hash code of given columns using the 64-bit variant of the xxHash algorithm, and returns the result as a long column."); + +/// Returns *null* if the input column is *true*; throws an exception with the provided error +/// message otherwise +pub fn assert_true(col: impl Into, err_msg: Option>) -> Column { + match err_msg { + Some(val) => invoke_func("assert_true", vec![col.into(), val.into()]), + None => invoke_func("assert_true", vec![col.into()]), + } +} + gen_func!(raise_error, [col: Column], "Throws an exception with the provided error message."); gen_func!(reflect, [cols: _], "Calls a method with reflection."); gen_func!(hll_sketch_estimate, [col: Column], "Returns the estimated number of unique values given the binary representation of a Datasketches HllSketch."); + +pub fn hll_union( + col1: impl Into, + col2: impl Into, + allow_different_lg_config_k: Option, +) -> Column { + match allow_different_lg_config_k { + Some(val) => invoke_func("hll_union", vec![col1.into(), col2.into(), lit(val)]), + None => invoke_func("hll_union", vec![col1.into(), col2.into()]), + } +} + gen_func!(java_method, [cols: _], "Calls a method with reflection."); gen_func!(stack, [cols: _], "Separates col1, …, colk into n rows"); gen_func!(user, [], "Returns the current database."); @@ -1393,6 +1922,35 @@ mod tests { false ); + // Test sort functions and column methods + #[tokio::test] + async fn test_func_from_csv() -> Result<(), SparkError> { + let spark = setup().await; + + let df = spark.sql("SELECT ' abc' as value").await?; + + let mut opts = HashMap::new(); + opts.insert("ignoreLeadingWhiteSpace", "True"); + + let res = df + .select([from_csv("value", lit("a STRING"), Some(opts)).alias("value")]) + .collect() + .await?; + + let a: ArrayRef = Arc::new(Int32Array::from(vec![1])); + let b: ArrayRef = Arc::new(Int32Array::from(vec![2])); + let c: ArrayRef = Arc::new(Int32Array::from(vec![3])); + + let expected = RecordBatch::try_from_iter_with_nullable(vec![ + ("a", a, false), + ("b", b, false), + ("c", c, false), + ])?; + + assert_eq!(expected, res); + Ok(()) + } + // Test sort functions and column methods #[tokio::test] async fn test_func_asc() -> Result<(), SparkError> {