Skip to content

Commit

Permalink
custom schemas for frame files
Browse files Browse the repository at this point in the history
  • Loading branch information
cklunch committed Oct 24, 2024
1 parent edf71cd commit 69daf58
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 19 deletions.
Binary file modified dist/neonutilities-1.0.1-py3-none-any.whl
Binary file not shown.
Binary file modified dist/neonutilities-1.0.1.tar.gz
Binary file not shown.
35 changes: 35 additions & 0 deletions src/neonutilities/__resources__/frame_file_variables.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
table,fieldName,description,dataType,units,downloadPkg,pubFormat
MCC,dnaSampleID,Identifier for DNA sample,string,NA,expanded,asIs
MCC,dnaSampleCode,Barcode of a DNA sample,string,NA,expanded,asIs
MCC,sequenceName,Name associated with the sequence,string,NA,expanded,asIs
MCC,taxonSequence,Sequence associated with the taxon,string,NA,expanded,asIs
MCC,completeTaxonomy,Full taxonomic hierarchy for identified organism,string,NA,expanded,asIs
MCC,domain,The scientific name of the domain in which the taxon is classified,string,NA,expanded,asIs
MCC,kingdom,The scientific name of the kingdom in which the taxon is classified,string,NA,expanded,asIs
MCC,phylum,The scientific name of the phylum or division in which the taxon is classified,string,NA,expanded,asIs
MCC,class,The scientific name of the class in which the taxon is classified,string,NA,expanded,asIs
MCC,order,The scientific name of the order in which the taxon is classified,string,NA,expanded,asIs
MCC,family,The scientific name of the family in which the taxon is classified,string,NA,expanded,asIs
MCC,genus,The scientific name of the genus in which the organism is classified,string,NA,expanded,asIs
MCC,specificEpithet,The specific epithet (second part of the species name) of the scientific name applied to the taxon,string,NA,expanded,asIs
MCC,scientificName,"Scientific name, associated with the taxonID. This is the name of the lowest level taxonomic rank that can be determined",string,NA,expanded,asIs
MCC,individualCount,Number of individuals of the same type,integer,NA,expanded,integer
MCC,fileName,"Name of file, including file extension",string,NA,expanded,asIs
REA,hoboSampleID,Unique identifier for the HOBO conductivity logger file,string,NA,expanded,asIs
REA,hoboSampleCode,Barcode of the HOBO conductivity logger file,string,NA,expanded,asIs
REA,measurementNumber,The number of the measurement in a time series,integer,NA,expanded,integer
REA,dateTimeLogger,Local date and time returned by a field data logger,dateTime,NA,expanded,asIs
REA,lowRangeHobo,Conductivity returned from a hobo logger for the low range,real,microsiemensPerCentimeter,expanded,asIs
REA,fullRangeHobo,Conductivity from a hobo logger for the full range,real,microsiemensPerCentimeter,expanded,asIs
REA,waterTemp,Temperature of water (C),real,celsius,expanded,asIs
REA,fullRangeSpCondLinear,Specific conductance calculated using linear method and fullRangeHobo,real,microsiemensPerCentimeter,expanded,asIs
REA,lowRangeSpCondLinear,Specific conductance calculated using linear method and lowRangeHobo,real,microsiemensPerCentimeter,expanded,asIs
REA,fullRangeSpCondNonlinear,Specific conductance calculated using non-linear method and fullRangeHobo,real,microsiemensPerCentimeter,expanded,asIs
REA,lowRangeSpCondNonlinear,Specific conductance calculated using non-linear method and lowRangeHobo,real,microsiemensPerCentimeter,expanded,asIs
REA,fileName,"Name of file, including file extension",string,NA,expanded,asIs
FSP,spectralSampleID,Identifier for a spectral sample,string,NA,expanded,asIs
FSP,spectralSampleCode,Barcode of a spectral sample,string,NA,expanded,asIs
FSP,wavelength,Wavelength of measurement,real,nanometer,expanded,asIs
FSP,reflectanceCondition,Conditions under which reflectance measurement was made,string,NA,expanded,asIs
FSP,reflectance,Reflectance of sample,real,proportion,expanded,asIs
FSP,fileName,"Name of file, including file extension",string,NA,expanded,asIs
109 changes: 90 additions & 19 deletions src/neonutilities/unzip_and_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,77 @@ def sort_dat(pdata):
return(pdata)


def stack_frame_files(framefiles, dpid,
seqtyp=None,
cloud_mode=False):
"""
Helper function to stack "data frame" files. These files do not go through the normal
publication process, they are stored and published as a fixed unit. NEON uses this
workflow for very large tabular files that can't be handled by the standard OS
data pipeline.
Parameters
--------
framefiles: A list of filepaths pointing to the data frame files
dpid: The data product identifier of the product being stacked
seqtyp: For microbe community data, sequence type 16S or ITS
cloud_mode: Use cloud mode to transfer files cloud-to-cloud? Defaults to False.
Return
--------
A stacked pandas table
Created on 24 Oct 2024
@author: Claire Lunch
"""

# no variables files for these, use custom files in package resources
frame_file_file = (importlib_resources.files(__resources__)/"frame_file_variables.csv")
frame_file_variables = pd.read_csv(frame_file_file, index_col=None)
#v = pd.concat([v, frame_file_variables], ignore_index=True)

fdict = {"DP1.30012.001":"FSP", "DP1.10081.001":"MCC", "DP1.20086.001":"MCC",
"DP1.20141.001":"MCC", "DP1.20190.001":"REA", "DP1.20193.001":"REA"}

fvars = pa.Table.from_pandas(frame_file_variables)
ftab = fvars.filter(pa.compute.field("table") == fdict[dpid])

fpkgvar = ftab.to_pandas()
fschema = get_variables(fpkgvar)

if cloud_mode:
gcs = fs.GcsFileSystem(anonymous=True)
framebuckets = [re.sub(pattern="https://storage.neonscience.org/",
repl="", string=b) for b in framefiles]
fdat = dataset.dataset(source=framebuckets, filesystem=gcs,
format="csv", schema=fschema)
else:
fdat = dataset.dataset(source=framefiles, format="csv",
schema=fschema)

fdattab = fdat.to_table()
fpdat = fdattab.to_pandas()

nm = "per_sample"

if dpid == "DP1.20190.001":
nm = "rea_conductivityRawData"
elif dpid == "DP1.20193.001":
nm = "sbd_conductivityRawData"
elif dpid == "DP1.30012.001":
nm = "fsp_rawSpectra"
elif dpid=="DP1.10081.001":
nm = f"mcc_soilPerSampleTaxonomy_{seqtyp}"
elif dpid=="DP1.20086.001":
nm = f"mcc_benthicPerSampleTaxonomy_{seqtyp}"
elif dpid=="DP1.20141.001":
nm = f"mcc_surfaceWaterPerSampleTaxonomy_{seqtyp}"

return {"frmdat":fpdat, "frmnm":nm}


def format_readme(readmetab,
tables):
"""
Expand Down Expand Up @@ -545,7 +616,7 @@ def stack_data_files_parallel(folder,
package: basic or expanded data package
dpid: Data product ID of product to stack.
progress: Should a progress bar be displayed?
cloud_mode: cloud_mode: Use cloud mode to transfer files cloud-to-cloud? If used, stack_by_table() expects a list of file urls as input. Defaults to False.
cloud_mode: Use cloud mode to transfer files cloud-to-cloud? If used, stack_by_table() expects a list of file urls as input. Defaults to False.
Return
--------
Expand Down Expand Up @@ -579,29 +650,29 @@ def stack_data_files_parallel(folder,
framefiles = [f for f in filepaths if not os.path.basename(f).startswith("NEON.")]
filepaths = [f for f in filepaths if os.path.basename(f).startswith("NEON.")]
filenames = [f for f in filenames if os.path.basename(f).startswith("NEON.")]

# stack frame files
if progress:
logging.info("Stacking per-sample files. These files may be very large; download data in smaller subsets if performance problems are encountered.\n")

# no variables files for these, have to let arrow infer. problem?
if cloud_mode:
framebuckets = [re.sub(pattern="https://storage.neonscience.org/",
repl="", string=b) for b in framefiles]
fdat = dataset.dataset(source=framebuckets, filesystem=gcs,
format="csv")
else:
fdat = dataset.dataset(source=framefiles, format="csv")

fdattab = fdat.to_table()
fpdat = fdattab.to_pandas()

if dpid == "DP1.20190.001":
stacklist["rea_conductivityRawData"] = fpdat
elif dpid == "DP1.20193.001":
stacklist["sbd_conductivityRawData"] = fpdat
# subset microbe community data by taxonomic group
# and stack both sets
if dpid in ["DP1.10081.001", "DP1.20086.001","DP1.20141.001"]:
bacteriafiles = [b for b in framefiles if re.search("[_]16S[_]", b)]
fungifiles = [b for b in framefiles if re.search("[_]ITS[_]", b)]

fpdat16 = stack_frame_files(bacteriafiles, dpid=dpid,
seqtyp="16S", cloud_mode=cloud_mode)
fpdatIT = stack_frame_files(fungifiles, dpid=dpid,
seqtyp="ITS", cloud_mode=cloud_mode)

stacklist[fpdat16["frmnm"]] = fpdat16["frmdat"]
stacklist[fpdatIT["frmnm"]] = fpdatIT["frmdat"]

else:
stacklist["per_sample"] = fpdat
fpdat = stack_frame_files(framefiles, dpid=dpid, seqtyp=None,
cloud_mode=cloud_mode)
stacklist[fpdat["frmnm"]] = fpdat["frmdat"]

# make a dictionary, where filenames are the keys to the filepath values
filelist = dict(zip(filenames, filepaths))
Expand Down

0 comments on commit 69daf58

Please sign in to comment.