Skip to content

Commit

Permalink
works standalone using regex, drop CLAN binaries dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
macramole committed Sep 8, 2021
1 parent 5150b5a commit aa34b53
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 95 deletions.
218 changes: 128 additions & 90 deletions ChaFile.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@
import re
from log import Log

# If you haven't created a symlink "clanBin" to CLAN binaries
# Change this to point to your CLAN binaries
CLAN_BIN_PATH = os.path.join( os.path.dirname(__file__), "./clanBin/" )

# LINE constants. Use these for getting data from each line
LINE_UTTERANCE = "emisión"
LINE_NUMBER = "número"
Expand Down Expand Up @@ -75,7 +71,7 @@
}
###############################

BULLET_TAG = "{0x15}"
BULLET_TAG = "\x15"

# TIER constants
TIER_MOR = "mor"
Expand Down Expand Up @@ -151,6 +147,8 @@ class ChaFile:
processedNouns = False
processedAdjectives = False

morFound = False #true if MOR was found in at least one line

def _parseMor(self, morContent, lineNumber):
"""Internal use. Parse MOR tier
Expand Down Expand Up @@ -236,7 +234,7 @@ def _parsePra(self, praContent, lineNumber):
"""
pra = praContent.strip()[1:].strip()
if not pra in TIER_PRAGMATIC_FUNCTION_FUNCTIONS:
log.log("Warning. Pragmatic function %s is invalid. Using %s (line %d)" % (MISSING_VALUE, pra, lineNumber))
log.log(f"Warning. Pragmatic function {pra} is invalid. Using {MISSING_VALUE} (line {lineNumber})")
return MISSING_VALUE

return pra
Expand All @@ -261,22 +259,22 @@ def _parseDad(self, dadContent, lineNumber):
return dads

def __init__(self, chaFilePath,
SPEAKER_IGNORE = [ SPEAKER_SILENCE ], USE_TIERS = [ TIER_MOR ],
SPEAKER_IGNORE = [ SPEAKER_SILENCE ], TIER_IGNORE = [],
CDS_ONLY = False, VERBOSE = False, language = None):
"""Constructor. Loads the CHA file and parse it
Args:
chaFilePath (string): Path to the CHA file
SPEAKER_IGNORE (list, optional): Utterances from these speakers won't be parsed. Defaults to [ SPEAKER_SILENCE ].
USE_TIERS (list, optional): Tiers to include when parsing the CHA file. Defaults to [ TIER_MOR ].
TIER_IGNORE (list, optional): Tiers to exclude when parsing the CHA file. Defaults to [].
CDS_ONLY (bool, optional): Only child directed utterances will be parsed. Defaults to False.
VERBOSE (bool, optional): Extra information will be printed when processing. Defaults to False.
language (string, optional): Use one of the LANGUAGE constants or None for parsing it from the CHA file. Defaults to None.
"""

self.chaFilePath = chaFilePath
self.SPEAKER_IGNORE = SPEAKER_IGNORE
self.USE_TIERS = USE_TIERS
self.TIER_IGNORE = TIER_IGNORE
self.CDS_ONLY = CDS_ONLY
self.VERBOSE = VERBOSE

Expand Down Expand Up @@ -398,99 +396,138 @@ def processLines(self):
Raises:
FileNotFoundError: The path to the CHA file does not exist
IOError: kwal command not found. Is CLAN_BIN_PATH properly set ?
IOError: MOR tier not found
"""
INDEX_CONTENIDO = 13
INDEX_LINENUMBER = 12
INDEX_SPEAKER = 3

strTiers = " "
for t in self.USE_TIERS:
strTiers += "+t%s " % ("%" + t)

if not os.path.isfile(self.chaFilePath):
raise FileNotFoundError()

# La primera línea deberia andar pero no anda desde spyder y no puedo debuggear
# entonces hago la segunda (que no creo que ande en Windows)
# exitcode, output = getstatusoutput( os.path.join(CLAN_BIN_PATH, "kwal") + " +d4 -f " + strTiers + " \"" + self.chaFilePath + "\" " )
command = "cat \"%s\" | %s +d4 -f %s" % (self.chaFilePath, os.path.join(CLAN_BIN_PATH, "kwal"), strTiers)
if self.VERBOSE:
log.log(command)

exitcode, output = getstatusoutput( command )

if exitcode != 0:
log.log("CLAN's kwal command failed. Command was:\n %s" % command)
log.log("Output: \n %s \n\n (...) \n\n %s" % ( output[0:1000], output[-1000:] ))
raise IOError()

try:
xmlFrom = output.find("<?xml")
xmlOutput = minidom.parseString(output[ xmlFrom: ])
except:
if MOR_ERROR_NO_MOR_FOUND in output:
raise IOError(ERROR_NO_MOR_FOUND)
else :
log.log("Error parsing CLAN's kwal XML output. Command was:\n %s" % command)
log.log("Output: \n %s \n\n (...) \n\n %s" % ( output[0:1000], output[-1000:] ))
raise
with open(self.chaFilePath,"r") as f:
txtCHA = f.read()

prog = re.compile(r"[\*%@]\w*:\t.*?(?=\*\w*:\t|@End|\Z)", re.S)#, re.X)
parsedCHA = prog.findall(txtCHA)

self.lines = []
self.speakers = []
languages = None
lineNumber = 1

for r in parsedCHA:
#is header
if r[0] == "@":
lineNumber += r.count("\n") + 2 #2= utf8 and begin
headerMatch = re.match(r"@Languages:\s(.*)", r)
if headerMatch:
languages = headerMatch.group(1)
continue

prog = re.compile(r"(?P<tier>[\*%][\w-]*):[\s]*(?P<content>.*?)(?=[\*%][\w-]*:[\s]*|@End|\Z)", re.S)

line = {
LINE_NUMBER : lineNumber
}

skipLine = False

#build line
for m in prog.finditer(r):
tier = m.group("tier")
content = m.group("content").replace("\t"," ").replace("\n", "")

if m.group("tier")[0] == "*": #is speaker
speaker = tier[1:]
if not speaker in self.SPEAKER_IGNORE:
if not speaker in self.speakers:
self.speakers.append(speaker)

line[LINE_SPEAKER] = speaker
line[LINE_UTTERANCE] = content

if BULLET_TAG in content:
regexBullet = r"\x15(?P<from>\d*)_(?P<to>\d*)\x15"
progBullet = re.compile(regexBullet)#, re.S)
parsedBullet = list(progBullet.finditer(content))

if len(parsedBullet) > 0 :
bulletFrom = int(parsedBullet[0].group("from"))
bulletTo = int(parsedBullet[-1].group("to"))
line[ LINE_BULLET ] = [bulletFrom, bulletTo]
line[ LINE_UTTERANCE ] = progBullet.sub("", line[ LINE_UTTERANCE ])
else:
#esto sucede cuando los bullets son de la forma %snd:"filename"_from_to
line[ LINE_UTTERANCE ] = line[ LINE_UTTERANCE ].replace(BULLET_TAG, "").strip()
else:
skipLine = True
break
else:
tierName = tier[1:]
line[tierName] = content

if tierName == "mor":
self.morFound = True

#no es realmente un tier
#esto sucede cuando los bullets son de la forma %snd:"filename"_from_to
if tierName == "snd":
del line[tierName]
lstContent = content.replace(BULLET_TAG, "").split("_")
line[ LINE_BULLET ] = [int(lstContent[-2]), int(lstContent[-1])]

if hasattr(self, f"_parse{tierName.capitalize()}" ):
tierProcessFunction = getattr(self, f"_parse{tierName.capitalize()}" )
line[tierName] = tierProcessFunction( line[tierName], line[LINE_NUMBER] )


if not skipLine:
self._setAddressee(line)

for row in xmlOutput.getElementsByTagName("Row"):
rowCells = row.getElementsByTagName("Cell")

lineNumber = rowCells[INDEX_LINENUMBER].firstChild.firstChild.data
content = rowCells[INDEX_CONTENIDO].firstChild.firstChild.data
speaker = rowCells[INDEX_SPEAKER].firstChild.firstChild.data

# In some cases speaker is not being parsed correctly
if speaker[0] == "*":
speaker = speaker[1:]
if not (self.CDS_ONLY and line[LINE_ADDRESSEE] not in [SPEAKER_TARGET_CHILD, SPEAKER_BOTH]):
if not speaker in self.SPEAKER_IGNORE:
self.lines.append(line)

lineNumber += r.count("\n")

if speaker in self.SPEAKER_IGNORE:
continue
if not speaker in self.speakers:
self.speakers.append(speaker)
#if MOR is found on file all lines should have at least an empty TIER_MOR
if self.morFound:
for l in self.getLines():
if TIER_MOR not in l:
l[TIER_MOR] = []
####

strLine = content[6:]

line = {
LINE_UTTERANCE : strLine,
LINE_NUMBER : int(lineNumber),
LINE_SPEAKER : speaker,
LINE_ADDRESSEE : None
}
# line = {
# LINE_UTTERANCE : strLine,
# LINE_NUMBER : int(lineNumber),
# LINE_SPEAKER : speaker,
# LINE_ADDRESSEE : None
# }

if BULLET_TAG in strLine:
bullet = strLine[ strLine.find(BULLET_TAG): ].replace(BULLET_TAG,"").split("_")
line[ LINE_BULLET ] = [ int(bullet[-2]), int(bullet[-1]) ]
line[ LINE_UTTERANCE ] = strLine[ :strLine.find(BULLET_TAG)].strip()
self.noBullets = False
# if BULLET_TAG in strLine:
# bullet = strLine[ strLine.find(BULLET_TAG): ].replace(BULLET_TAG,"").split("_")
# line[ LINE_BULLET ] = [ int(bullet[-2]), int(bullet[-1]) ]
# line[ LINE_UTTERANCE ] = strLine[ :strLine.find(BULLET_TAG)].strip()
# self.noBullets = False

for tier in self.USE_TIERS:
line[tier] = MISSING_VALUE
# for tier in self.USE_TIERS:
# line[tier] = MISSING_VALUE

tiers = []
for i in range(INDEX_CONTENIDO+1, len(rowCells)):
tierContent = rowCells[i].firstChild.firstChild.data
tiers.append( tierContent )
# tiers = []
# for i in range(INDEX_CONTENIDO+1, len(rowCells)):
# tierContent = rowCells[i].firstChild.firstChild.data
# tiers.append( tierContent )

for tier in tiers:
tierName = tier[1:4]
line[tierName] = tier[5:].strip()
# for tier in tiers:
# tierName = tier[1:4]
# line[tierName] = tier[5:].strip()

if hasattr(self, "_parse%s" % tierName.capitalize() ):
tierProcessFunction = getattr(self, "_parse%s" % tierName.capitalize() )
line[tierName] = tierProcessFunction( line[tierName], line[LINE_NUMBER] )
# if hasattr(self, "_parse%s" % tierName.capitalize() ):
# tierProcessFunction = getattr(self, "_parse%s" % tierName.capitalize() )
# line[tierName] = tierProcessFunction( line[tierName], line[LINE_NUMBER] )

self._setAddressee(line)


if not (self.CDS_ONLY and line[LINE_ADDRESSEE] not in [SPEAKER_TARGET_CHILD, SPEAKER_BOTH ]):
self.lines.append(line)
# if not (self.CDS_ONLY and line[LINE_ADDRESSEE] not in [SPEAKER_TARGET_CHILD, SPEAKER_BOTH ]):
# self.lines.append(line)

def _setAddressee(self, line):
"""Internal use. Set normalized addressee
Expand Down Expand Up @@ -565,7 +602,7 @@ def countWordsByAddressee(self):
Returns:
dict: Number of words grouped by addresee
"""
assert "mor" in self.USE_TIERS, "mor tier has to be selected for usage to use this function"
assert self.morFound, "MOR tier not found"
addressees = {}

for l in self.getLines():
Expand All @@ -588,7 +625,7 @@ def countWordsInLine(self, line):
Returns:
int: Number of words in the utterance
"""
assert "mor" in self.USE_TIERS, "mor tier has to be selected for usage to use this function"
assert self.morFound, "MOR tier not found"
dontCount = ["cm"] #no cuentes la coma

c = 0
Expand Down Expand Up @@ -629,7 +666,8 @@ def getNounsInLine(self, linea):
Returns:
list: List of indexes
"""
assert "mor" in self.USE_TIERS, "mor tier has to be selected for usage to use this function"
assert self.morFound, "MOR tier not found"

nouns = []

if linea[TIER_MOR] != MISSING_VALUE:
Expand All @@ -642,7 +680,7 @@ def getNounsInLine(self, linea):
def populateNouns(self):
"""Populate LINE_NOUNS for every line with the indexes of the MOR line where nouns are found
"""
assert "mor" in self.USE_TIERS, "mor tier has to be selected for usage to use this function"
assert self.morFound, "MOR tier not found"

for linea in self.getLines():
linea[LINE_NOUNS] = self.getNounsInLine(linea)
Expand Down Expand Up @@ -679,7 +717,7 @@ def getAdjectivesInLine(self, linea):
Returns:
list: List of indexes
"""
assert "mor" in self.USE_TIERS, "mor tier has to be selected for usage to use this function"
assert self.morFound, "MOR tier not found"
adjetivos = []

if linea[TIER_MOR] != MISSING_VALUE:
Expand All @@ -692,7 +730,7 @@ def getAdjectivesInLine(self, linea):
def populateAdjectives(self):
"""Populate LINE_ADJECTIVES of every line with the indexes of the MOR tier where adjectives are found
"""
assert "mor" in self.USE_TIERS, "mor tier has to be selected for usage to use this function"
assert self.morFound, "MOR tier not found"

for linea in self.getLines():
linea[LINE_ADJECTIVES] = self.getAdjectivesInLine(linea)
Expand Down Expand Up @@ -951,7 +989,7 @@ def findLinesByMorCriteria(self, criteria, criteriaType=MOR_UNIT_CATEGORIA):
"""
results = []

assert "mor" in self.USE_TIERS, "mor tier has to be selected for usage to use this function"
assert self.morFound, "MOR tier not found"
assert isinstance( criteria, list ) and len(criteria) > 0 and isinstance( criteria[0], list ), "invalid criteria, expected: [ [], ...]"

for line in self.lines:
Expand Down
8 changes: 3 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

Class for parsing [CLAN's](http://dali.talkbank.org/clan/) CHA file.

Made by Leandro Garber from [CIIPME-CONICET](http://www.ciipme-conicet.gov.ar/wordpress/)
Made by Leandro Garber from [CIIPME-CONICET](http://www.ciipme-conicet.gov.ar/)

## Features
* Utterances as a list of strings
Expand All @@ -17,8 +17,6 @@ Made by Leandro Garber from [CIIPME-CONICET](http://www.ciipme-conicet.gov.ar/wo

## Usage

Make a symlink (ln -s) to CLAN's bin directory called "clanBin".

### Import
```python
import sys
Expand All @@ -35,7 +33,7 @@ cha = ChaFile(<path_to_cha_file>)
Options

```python
cha = ChaFile(<path_to_cha_file>, <list_of_ignored_speakers> = [ SPEAKER_SILENCE ], <list_of_tiers> = [ TIER_MOR ], CDS_ONLY = False )
cha = ChaFile(<path_to_cha_file> )
```
### Get utterances
```python
Expand All @@ -48,7 +46,7 @@ Each line is an object with:
* LINE_ADDRESSEE
* LINE_BULLET : Timestamp
* TIER_MOR : A list of objects with MOR data: MOR_UNIT_LEXEMA and MOR_UNIT_CATEGORIA
* ... any other tier you selected when instanced
* ... any other tier

### Cite

Expand Down

0 comments on commit aa34b53

Please sign in to comment.