diff --git a/.gitignore b/.gitignore index 6543906..5cf11b6 100644 --- a/.gitignore +++ b/.gitignore @@ -137,5 +137,6 @@ tags.temp # Local ext/arabic_rom/data +scriptshifter/data/*.db !.keep VERSION diff --git a/scriptshifter/data/.keep b/scriptshifter/data/.keep new file mode 100644 index 0000000..e69de29 diff --git a/scriptshifter/tables/__init__.py b/scriptshifter/tables/__init__.py index 94a34d5..863efa2 100644 --- a/scriptshifter/tables/__init__.py +++ b/scriptshifter/tables/__init__.py @@ -2,6 +2,7 @@ import re import sqlite3 +from collections import defaultdict from importlib import import_module from json import dumps as jdumps, loads as jloads from os import R_OK, access, environ, makedirs, path, unlink @@ -184,6 +185,15 @@ def init_db(): unlink(TMP_DB_PATH) +def get_connection(): + """ + Get the default DB connection object. + + To be closed by the caller or used as a context. + """ + return sqlite3.connect(DB_PATH) + + def populate_table(conn, tid, tname): data = load_table(tname) flags = 0 @@ -206,23 +216,25 @@ def populate_table(conn, tid, tname): continue # Transliteration map. + sort = 1 for k, v in sec.get("map", {}): conn.execute( """INSERT INTO tbl_trans_map ( - lang_id, dir, src, dest - ) VALUES (?, ?, ?, ?)""", - (tid, t_dir, k, v)) + lang_id, dir, src, dest, sort + ) VALUES (?, ?, ?, ?, ?)""", + (tid, t_dir, k, v, sort)) + sort += 1 # hooks. for k, v in sec.get("hooks", {}).items(): for i, hook_data in enumerate(v, start=1): conn.execute( """INSERT INTO tbl_hook ( - lang_id, dir, name, sort, fn, signature - ) VALUES (?, ?, ?, ?, ?, ?)""", + lang_id, dir, name, sort, module, fn, kwargs + ) VALUES (?, ?, ?, ?, ?, ?, ?)""", ( - tid, t_dir, k, i, - hook_data[0].__name__, jdumps(hook_data[1:]))) + tid, t_dir, k, i, hook_data[0], + hook_data[1].__name__, jdumps(hook_data[2]))) # Ignore rules (R2S only). for row in sec.get("ignore", []): @@ -277,7 +289,7 @@ def list_tables(): Note that this may not correspond to all the table files in the data folder, but only those exposed in the index. """ - conn = sqlite3.connect(DB_PATH) + conn = get_connection() with conn: data = conn.execute( @@ -463,7 +475,7 @@ def load_hook_fn(cname, sec): f"Hook function {fnname} defined in {cname} configuration " f"not found in module {HOOK_PKG_PATH}.{modname}!" ) - hook_fn[cfg_hook].append((fn, fn_kwargs)) + hook_fn[cfg_hook].append((modname, fn, fn_kwargs)) return hook_fn @@ -471,32 +483,16 @@ def load_hook_fn(cname, sec): def get_language(lang): """ Get all language options from the DB. """ - conn = sqlite3.connect(DB_PATH) + conn = get_connection() with conn: - lang_q = conn.execute( - """SELECT id, name, label, features, marc_code, description - FROM tbl_language WHERE name = ?""", (lang,)) - lang_data = lang_q.fetchone() - lang_id = lang_data[0] - - data = { - "name": lang_data[1], - "label": lang_data[2], - "has_s2r": bool(lang_data[3] & FEAT_S2R), - "has_r2s": bool(lang_data[3] & FEAT_R2S), - "case_sensitive": not (lang_data[3] & FEAT_CASEI), - "marc_code": lang_data[4], - "description": lang_data[5], - } + general = get_lang_general(conn, lang) + lang_id = general["id"] + data = general["data"] # Normalization. - norm_q = conn.execute( - """SELECT src, dest FROM tbl_normalize - WHERE lang_id = ?""", - (lang_id,)) - norm_data = {row[0]: row[1] for row in norm_q} + norm_data = get_lang_normalize(conn, lang_id) if len(norm_data): data["normalize"] = norm_data @@ -504,26 +500,12 @@ def get_language(lang): if data["has_s2r"]: data["script_to_roman"] = {} - s2r_q = conn.execute( - """SELECT src, dest FROM tbl_trans_map - WHERE lang_id = ? AND dir = ?""", - (lang_id, FEAT_S2R)) - s2r_map = tuple((row[0], row[1]) for row in s2r_q) + s2r_map = tuple( + row for row in get_lang_map(conn, lang_id, FEAT_S2R)) if len(s2r_map): data["script_to_roman"]["map"] = s2r_map - hooks_q = conn.execute( - """SELECT name, fn, signature - FROM tbl_hook WHERE lang_id = ? AND dir = ? - ORDER BY sort""", - (lang_id, FEAT_S2R)) - s2r_hooks = [ - { - "name": row[0], - "fn": row[1], - "signature": jloads(row[2]), - } for row in hooks_q - ] + s2r_hooks = get_lang_hooks(conn, lang_id, FEAT_S2R) if len(s2r_hooks): data["script_to_roman"]["hooks"] = s2r_hooks @@ -531,56 +513,136 @@ def get_language(lang): if data["has_r2s"]: data["roman_to_script"] = {} - r2s_q = conn.execute( - """SELECT src, dest FROM tbl_trans_map - WHERE lang_id = ? AND dir = ?""", - (lang_id, FEAT_R2S)) - r2s_map = tuple((row[0], row[1]) for row in r2s_q) + r2s_map = tuple( + row for row in get_lang_map(conn, lang_id, FEAT_R2S)) if len(r2s_map): data["roman_to_script"]["map"] = r2s_map - ignore_q = conn.execute( - """SELECT rule, features FROM tbl_ignore - WHERE lang_id = ?""", - (lang_id,)) - # Features (regular expressions) not implemented yet. - r2s_ignore = tuple(row[0] for row in ignore_q) + r2s_ignore = get_lang_ignore(conn, lang_id) if len(r2s_ignore): data["roman_to_script"]["ignore"] = r2s_ignore - hooks_q = conn.execute( - """SELECT name, fn, signature - FROM tbl_hook WHERE lang_id = ? AND dir = ? - ORDER BY sort""", - (lang_id, FEAT_R2S)) - r2s_hooks = [ - { - "name": row[0], - "fn": row[1], - "signature": jloads(row[2]), - } for row in hooks_q - ] + r2s_hooks = get_lang_hooks(conn, lang_id, FEAT_R2S) if len(r2s_hooks): data["roman_to_script"]["hooks"] = r2s_hooks - options_q = conn.execute( - """SELECT name, label, description, dtype, options, default_v - FROM tbl_option - WHERE lang_id = ?""", - (lang_id,)) + opt_data = get_lang_options(conn, lang_id) + if len(opt_data): + data["options"] = opt_data + + double_cap = get_lang_dcap(conn, lang_id) + if len(double_cap): + data["double_cap"] = double_cap + + conn.close() + + return data - opt_data = tuple( + +def get_lang_general(conn, lang): + """ Language general attributes. """ + lang_q = conn.execute( + """SELECT id, name, label, features, marc_code, description + FROM tbl_language WHERE name = ?""", (lang,)) + lang_data = lang_q.fetchone() + + return { + "id": lang_data[0], + "data": { + "name": lang_data[1], + "label": lang_data[2], + "has_s2r": bool(lang_data[3] & FEAT_S2R), + "has_r2s": bool(lang_data[3] & FEAT_R2S), + "case_sensitive": not (lang_data[3] & FEAT_CASEI), + "marc_code": lang_data[4], + "description": lang_data[5], + }, + } + + +def get_lang_normalize(conn, lang_id): + qry = conn.execute( + """SELECT src, dest FROM tbl_normalize + WHERE lang_id = ?""", + (lang_id,)) + return {row[0]: row[1] for row in qry} + + +def get_lang_ignore(conn, lang_id): + """ + Ignore list as a tuple. + """ + qry = conn.execute( + """SELECT rule, features FROM tbl_ignore + WHERE lang_id = ?""", + (lang_id,)) + # Features (regular expressions) not implemented yet. + return tuple(row[0] for row in qry) + + +def get_lang_map(conn, lang_id, t_dir): + """ + S2R or R2S map. + + Generator of tuples (source, destination). + """ + qry = conn.execute( + """SELECT src, dest FROM tbl_trans_map + WHERE lang_id = ? AND dir = ? + ORDER BY sort ASC""", + (lang_id, FEAT_S2R)) + + for row in qry: + yield (Token(row[0]), row[1]) + + +def get_lang_options(conn, lang_id): + """ Language options as a tuple of dictionaries. """ + qry = conn.execute( + """SELECT name, label, description, dtype, options, default_v + FROM tbl_option + WHERE lang_id = ?""", + (lang_id,)) + + return tuple( + { + "id": row[0], + "label": row[1], + "description": row[2], + "type": row[3], + "options": jloads(row[4]) if row[4] else None, + "default": row[5], + } + for row in qry + ) + + +def get_lang_hooks(conn, lang_id, t_dir): + """ Language hooks in sorting order. """ + hooks = defaultdict(list) + + qry = conn.execute( + """SELECT name, module, fn, kwargs + FROM tbl_hook WHERE lang_id = ? AND dir = ? + ORDER BY name, sort""", + (lang_id, t_dir)) + + for row in qry: + hooks[row[0]].append( { - "id": row[0], - "label": row[1], - "description": row[2], - "type": row[3], - "options": jloads(row[4]) if row[4] else None, - "default": row[5], + "module_name": row[1], + "fn_name": row[2], + "kwargs": jloads(row[3]), } - for row in options_q ) - if len(opt_data): - data["options"] = opt_data - return data + return hooks + + +def get_lang_dcap(conn, lang_id): + qry = conn.execute( + """SELECT rule + FROM tbl_double_cap WHERE lang_id = ?""", + (lang_id,)) + + return tuple(row[0] for row in qry) diff --git a/scriptshifter/tables/init.sql b/scriptshifter/tables/init.sql index dcb1f93..a563d1c 100644 --- a/scriptshifter/tables/init.sql +++ b/scriptshifter/tables/init.sql @@ -23,22 +23,28 @@ CREATE TABLE tbl_trans_map ( dir TINYINT NOT NULL DEFAULT 0, /* 1 = S2R; 2 = R2S */ src TEXT NOT NULL, dest TEXT, + sort INT NOT NULL, /* Smaller values have higher priority. */ FOREIGN KEY (lang_id) REFERENCES tbl_language(id) ON DELETE CASCADE ); CREATE UNIQUE INDEX idx_trans_lookup ON tbl_trans_map (lang_id, dir, src); +CREATE INDEX idx_trans_map_sort ON tbl_trans_map (sort ASC); /* * Processing hooks. + * + * Note that multiple functions may be grouped under the same hook, lang, and + * direction. These are ordered by `sort`. */ CREATE TABLE tbl_hook ( id INTEGER PRIMARY KEY, lang_id INTEGER NOT NULL, dir TINYINT NOT NULL DEFAULT 0, /* 1 = S2R; 2 = R2S */ - name TEXT NOT NULL, /* Hook name. */ + name TEXT NOT NULL, /* Hook name. */ sort INT NOT NULL, /* Function sorting order within the hook. */ + module TEXT NOT NULL, /* Module name. */ fn TEXT NOT NULL, /* Function name. */ - signature TEXT, /* Arguments as JSON blob. */ + kwargs TEXT, /* KW arguments as JSON blob. */ FOREIGN KEY (lang_id) REFERENCES tbl_language(id) ON DELETE CASCADE ); diff --git a/scriptshifter/trans.py b/scriptshifter/trans.py index ea55046..d9c06c6 100644 --- a/scriptshifter/trans.py +++ b/scriptshifter/trans.py @@ -1,9 +1,13 @@ import logging +from importlib import import_module from re import compile from scriptshifter.exceptions import BREAK, CONT -from scriptshifter.tables import BOW, EOW, WORD_BOUNDARY, load_table +from scriptshifter.tables import ( + BOW, EOW, WORD_BOUNDARY, FEAT_R2S, FEAT_S2R, HOOK_PKG_PATH, + get_connection, get_lang_dcap, get_lang_general, get_lang_hooks, + get_lang_ignore, get_lang_map, get_lang_normalize) # Match multiple spaces. @@ -15,6 +19,8 @@ class Context: """ Context used within the transliteration and passed to hook functions. + + Use within a `with` block for proper cleanup. """ @property def src(self): @@ -28,23 +34,35 @@ def src(self): def src(self): raise NotImplementedError("Attribute is read-only.") - def __init__(self, src, general, langsec, options={}): + def __init__(self, lang, src, t_dir, options={}): """ Initialize a context. Args: src (str): The original text. Read-only. - general (dict): general section of the current config. - langsec (dict): Language configuration section being used. + t_dir (int): the direction of transliteration. + Either FEAT_R2S or FEAT_S2R. options (dict): extra options as a dict. """ + self.lang = lang self._src = src - self.general = general + self.t_dir = t_dir + self.conn = get_connection() + with self.conn as conn: + general = get_lang_general(conn, self.lang) + self.general = general["data"] + self.lang_id = general["id"] self.options = options - self.langsec = langsec + self.hooks = get_lang_hooks(self.conn, self.lang_id, self.t_dir) self.dest_ls = [] self.warnings = [] + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.conn.close() + def transliterate(src, lang, t_dir="s2r", capitalize=False, options={}): """ @@ -73,234 +91,224 @@ def transliterate(src, lang, t_dir="s2r", capitalize=False, options={}): Return: str: The transliterated string. """ - source_str = "Latin" if t_dir == "r2s" else lang - target_str = lang if t_dir == "r2s" else "Latin" - logger.info(f"Transliteration is from {source_str} to {target_str}.") - - cfg = load_table(lang) - logger.info(f"Loaded table for {lang}.") - - # General directives. - general = cfg.get("general", {}) - - if t_dir == "s2r" and "script_to_roman" not in cfg: - raise NotImplementedError( - f"Script-to-Roman transliteration not yet supported for {lang}." - ) - elif t_dir == "r2s" and "roman_to_script" not in cfg: - raise NotImplementedError( - f"Roman-to-script transliteration not yet supported for {lang}." - ) + # Map t_dir to constant. + t_dir = FEAT_S2R if t_dir == "s2r" else FEAT_R2S - langsec = ( - cfg["script_to_roman"] if t_dir == "s2r" - else cfg["roman_to_script"]) - # langsec_dir = langsec.get("directives", {}) - langsec_hooks = langsec.get("hooks", {}) + source_str = "Roman" if t_dir == FEAT_S2R else lang + target_str = lang if t_dir == FEAT_R2S else "Roman" + logger.info(f"Transliteration is from {source_str} to {target_str}.") src = src.strip() options["capitalize"] = capitalize - ctx = Context(src, general, langsec, options) - - # This hook may take over the whole transliteration process or delegate it - # to some external process, and return the output string directly. - if _run_hook("post_config", ctx, langsec_hooks) == BREAK: - return getattr(ctx, "dest", ""), ctx.warnings - - if "normalize" in ctx.langsec: - _normalize_src(ctx) - - if _run_hook("post_normalize", ctx, langsec_hooks) == BREAK: - return getattr(ctx, "dest", ""), ctx.warnings - - # Loop through source characters. The increment of each loop depends on - # the length of the token that eventually matches. - ignore_list = langsec.get("ignore", []) # Only present in R2S - ctx.cur = 0 - word_boundary = langsec.get("word_boundary", WORD_BOUNDARY) - - while ctx.cur < len(ctx.src): - # Reset cursor position flags. - # Carry over extended "beginning of word" flag. - ctx.cur_flags = 0 - cur_char = ctx.src[ctx.cur] - - # Look for a word boundary and flag word beginning/end it if found. - if _is_bow(ctx.cur, ctx, word_boundary): - # Beginning of word. - logger.debug(f"Beginning of word at position {ctx.cur}.") - ctx.cur_flags |= BOW - if _is_eow(ctx.cur, ctx, word_boundary): - # End of word. - logger.debug(f"End of word at position {ctx.cur}.") - ctx.cur_flags |= EOW - - # This hook may skip the parsing of the current - # token or exit the scanning loop altogether. - hret = _run_hook("begin_input_token", ctx, langsec_hooks) - if hret == BREAK: - logger.debug("Breaking text scanning from hook signal.") - break - if hret == CONT: - logger.debug("Skipping scanning iteration from hook signal.") - continue - - # Check ignore list. Find as many subsequent ignore tokens - # as possible before moving on to looking for match tokens. - ctx.tk = None - while True: - ctx.ignoring = False - for ctx.tk in ignore_list: - hret = _run_hook("pre_ignore_token", ctx, langsec_hooks) - if hret == BREAK: - break - if hret == CONT: - continue + with Context(lang, src, t_dir, options) as ctx: + + if t_dir == FEAT_S2R and not ctx.general["has_s2r"]: + raise NotImplementedError( + f"Script-to-Roman not yet supported for {lang}." + ) + if t_dir == FEAT_R2S and not ctx.general["has_r2s"]: + raise NotImplementedError( + f"Roman-to-script not yet supported for {lang}." + ) + + # This hook may take over the whole transliteration process or delegate + # it to some external process, and return the output string directly. + if _run_hook("post_config", ctx) == BREAK: + return getattr(ctx, "dest", ""), ctx.warnings + + _normalize_src(ctx, get_lang_normalize(ctx.conn, ctx.lang_id)) + + if _run_hook("post_normalize", ctx) == BREAK: + return getattr(ctx, "dest", ""), ctx.warnings + + # Loop through source characters. The increment of each loop depends on + # the length of the token that eventually matches. + ctx.cur = 0 + + while ctx.cur < len(ctx.src): + # Reset cursor position flags. + # Carry over extended "beginning of word" flag. + ctx.cur_flags = 0 + cur_char = ctx.src[ctx.cur] + + # Look for a word boundary and flag word beginning/end it if found. + if _is_bow(ctx.cur, ctx, WORD_BOUNDARY): + # Beginning of word. + logger.debug(f"Beginning of word at position {ctx.cur}.") + ctx.cur_flags |= BOW + if _is_eow(ctx.cur, ctx, WORD_BOUNDARY): + # End of word. + logger.debug(f"End of word at position {ctx.cur}.") + ctx.cur_flags |= EOW + + # This hook may skip the parsing of the current + # token or exit the scanning loop altogether. + hret = _run_hook("begin_input_token", ctx) + if hret == BREAK: + logger.debug("Breaking text scanning from hook signal.") + break + if hret == CONT: + logger.debug("Skipping scanning iteration from hook signal.") + continue - step = len(ctx.tk) - if ctx.tk == ctx.src[ctx.cur:ctx.cur + step]: - # The position matches an ignore token. - hret = _run_hook("on_ignore_match", ctx, langsec_hooks) + # Check ignore list. Find as many subsequent ignore tokens + # as possible before moving on to looking for match tokens. + ctx.tk = None + while True: + ctx.ignoring = False + for ctx.tk in get_lang_ignore(ctx.conn, ctx.lang_id): + hret = _run_hook("pre_ignore_token", ctx) if hret == BREAK: break if hret == CONT: continue - logger.info(f"Ignored token: {ctx.tk}") - ctx.dest_ls.append(ctx.tk) - ctx.cur += step - cur_char = ctx.src[ctx.cur] - ctx.ignoring = True + step = len(ctx.tk) + if ctx.tk == ctx.src[ctx.cur:ctx.cur + step]: + # The position matches an ignore token. + hret = _run_hook("on_ignore_match", ctx) + if hret == BREAK: + break + if hret == CONT: + continue + + logger.info(f"Ignored token: {ctx.tk}") + ctx.dest_ls.append(ctx.tk) + ctx.cur += step + cur_char = ctx.src[ctx.cur] + ctx.ignoring = True + break + # We looked through all ignore tokens, not found any. Move on. + if not ctx.ignoring: break - # We looked through all ignore tokens, not found any. Move on. - if not ctx.ignoring: - break - # Otherwise, if we found a match, check if the next position may be - # ignored as well. - - delattr(ctx, "tk") - delattr(ctx, "ignoring") - - # Begin transliteration token lookup. - ctx.match = False - - for ctx.src_tk, ctx.dest_str in langsec["map"]: - hret = _run_hook("pre_tx_token", ctx, langsec_hooks) - if hret == BREAK: - break - if hret == CONT: - continue + # Otherwise, if we found a match, check if the next position + # may be ignored as well. - step = len(ctx.src_tk.content) - # If the token is longer than the remaining of the string, - # it surely won't match. - if ctx.cur + step > len(ctx.src): - continue + delattr(ctx, "tk") + delattr(ctx, "ignoring") - # If the first character of the token is greater (= higher code - # point value) than the current character, then break the loop - # without a match, because we know there won't be any more match - # due to the alphabetical ordering. - if ctx.src_tk.content[0] > cur_char: - logger.debug( - f"{ctx.src_tk.content} is after " - f"{ctx.src[ctx.cur:ctx.cur + step]}. Breaking loop.") - break + # Begin transliteration token lookup. + ctx.match = False - # If src_tk has a WB flag but the token is not at WB, skip. - if ( - (ctx.src_tk.flags & BOW and not ctx.cur_flags & BOW) - or - # Can't rely on EOW flag, we must check on the last character - # of the potential match. - (ctx.src_tk.flags & EOW and not _is_eow( - ctx.cur + step - 1, ctx, word_boundary)) - ): - continue - - # Longer tokens should be guaranteed to be scanned before their - # substrings at this point. - # Similarly, flagged tokens are evaluated first. - if ctx.src_tk.content == ctx.src[ctx.cur:ctx.cur + step]: - ctx.match = True - # This hook may skip this token or break out of the token - # lookup for the current position. - hret = _run_hook("on_tx_token_match", ctx, langsec_hooks) + for ctx.src_tk, ctx.dest_str in get_lang_map( + ctx.conn, ctx.lang_id, ctx.t_dir): + hret = _run_hook("pre_tx_token", ctx) if hret == BREAK: break if hret == CONT: continue - # A match is found. Stop scanning tokens, append result, and - # proceed scanning the source. + step = len(ctx.src_tk.content) + # If the token is longer than the remaining of the string, + # it surely won't match. + if ctx.cur + step > len(ctx.src): + continue - # Capitalization. + # If the first character of the token is greater (= higher code + # point value) than the current character, then break the loop + # without a match, because we know there won't be any more + # match due to the alphabetical ordering. + if ctx.src_tk.content[0] > cur_char: + logger.debug( + f"{ctx.src_tk.content} is after " + f"{ctx.src[ctx.cur:ctx.cur + step]}. " + "Breaking loop.") + break + + # If src_tk has a WB flag but the token is not at WB, skip. if ( - (ctx.options["capitalize"] == "first" and ctx.cur == 0) + (ctx.src_tk.flags & BOW and not ctx.cur_flags & BOW) or - ( - ctx.options["capitalize"] == "all" - and ctx.cur_flags & BOW - ) + # Can't rely on EOW flag, we must check on the last + # character of the potential match. + (ctx.src_tk.flags & EOW and not _is_eow( + ctx.cur + step - 1, ctx, WORD_BOUNDARY)) ): - logger.info("Capitalizing token.") - double_cap = False - for dcap_rule in ctx.langsec.get("double_cap", []): - if ctx.dest_str == dcap_rule: - ctx.dest_str = ctx.dest_str.upper() - double_cap = True - break - if not double_cap: - ctx.dest_str = ( - ctx.dest_str[0].upper() + ctx.dest_str[1:]) + continue - ctx.dest_ls.append(ctx.dest_str) - ctx.cur += step - break + # Longer tokens should be guaranteed to be scanned before their + # substrings at this point. + # Similarly, flagged tokens are evaluated first. + if ctx.src_tk.content == ctx.src[ctx.cur:ctx.cur + step]: + ctx.match = True + # This hook may skip this token or break out of the token + # lookup for the current position. + hret = _run_hook("on_tx_token_match", ctx) + if hret == BREAK: + break + if hret == CONT: + continue - if ctx.match is False: - delattr(ctx, "match") - hret = _run_hook("on_no_tx_token_match", ctx, langsec_hooks) - if hret == BREAK: - break - if hret == CONT: - continue + # A match is found. Stop scanning tokens, append result, + # and proceed scanning the source. + + # Capitalization. + if ( + (ctx.options["capitalize"] == "first" and ctx.cur == 0) + or + ( + ctx.options["capitalize"] == "all" + and ctx.cur_flags & BOW + ) + ): + logger.info("Capitalizing token.") + double_cap = False + for dcap_rule in get_lang_dcap(ctx.conn, ctx.lang_id): + if ctx.dest_str == dcap_rule: + ctx.dest_str = ctx.dest_str.upper() + double_cap = True + break + if not double_cap: + ctx.dest_str = ( + ctx.dest_str[0].upper() + ctx.dest_str[1:]) + + ctx.dest_ls.append(ctx.dest_str) + ctx.cur += step + break + + if ctx.match is False: + delattr(ctx, "match") + hret = _run_hook("on_no_tx_token_match", ctx) + if hret == BREAK: + break + if hret == CONT: + continue - # No match found. Copy non-mapped character (one at a time). - logger.info( - f"Token {cur_char} (\\u{hex(ord(cur_char))[2:]}) " - f"at position {ctx.cur} is not mapped.") - ctx.dest_ls.append(cur_char) - ctx.cur += 1 - else: - delattr(ctx, "match") - delattr(ctx, "cur_flags") + # No match found. Copy non-mapped character (one at a time). + logger.info( + f"Token {cur_char} (\\u{hex(ord(cur_char))[2:]}) " + f"at position {ctx.cur} is not mapped.") + ctx.dest_ls.append(cur_char) + ctx.cur += 1 + else: + delattr(ctx, "match") + delattr(ctx, "cur_flags") - delattr(ctx, "cur") + delattr(ctx, "cur") - # This hook may take care of the assembly and cause the function to return - # its own return value. - hret = _run_hook("pre_assembly", ctx, langsec_hooks) - if hret is not None: - return hret, ctx.warnings + # This hook may take care of the assembly and cause the function to + # return its own return value. + hret = _run_hook("pre_assembly", ctx) + if hret is not None: + return hret, ctx.warnings - logger.debug(f"Output list: {ctx.dest_ls}") - ctx.dest = "".join(ctx.dest_ls) + logger.debug(f"Output list: {ctx.dest_ls}") + ctx.dest = "".join(ctx.dest_ls) - # This hook may reassign the output string and/or cause the function to - # return it immediately. - hret = _run_hook("post_assembly", ctx, langsec_hooks) - if hret is not None: - return hret, ctx.warnings + # This hook may reassign the output string and/or cause the function to + # return it immediately. + hret = _run_hook("post_assembly", ctx) + if hret is not None: + return hret, ctx.warnings - # Strip multiple spaces and leading/trailing whitespace. - ctx.dest = MULTI_WS_RE.sub(r"\1", ctx.dest.strip()) + # Strip multiple spaces and leading/trailing whitespace. + ctx.dest = MULTI_WS_RE.sub(r"\1", ctx.dest.strip()) - return ctx.dest, ctx.warnings + return ctx.dest, ctx.warnings -def _normalize_src(ctx): - for nk, nv in ctx.langsec.get("normalize", {}).items(): +def _normalize_src(ctx, norm_rules): + for nk, nv in norm_rules.items(): ctx._src = ctx.src.replace(nk, nv) logger.debug(f"Normalized source: {ctx.src}") @@ -317,11 +325,13 @@ def _is_eow(cur, ctx, word_boundary): ) and (ctx.src[cur] not in word_boundary) -def _run_hook(hname, ctx, hooks): +def _run_hook(hname, ctx): ret = None - for hook_def in hooks.get(hname, []): - kwargs = hook_def[1] if len(hook_def) > 1 else {} - ret = hook_def[0](ctx, **kwargs) + for hook_def in ctx.hooks.get(hname, []): + fn = getattr( + import_module("." + hook_def["module_name"], HOOK_PKG_PATH), + hook_def["fn_name"]) + ret = fn(ctx, **hook_def["kwargs"]) if ret in (BREAK, CONT): # This will stop parsing hooks functions and tell the caller to # break out of the outer loop or skip iteration.