Skip to content

Commit

Permalink
vcf2arrow transforms variants into KEY=VALUE form
Browse files Browse the repository at this point in the history
  • Loading branch information
kaigai committed Jan 20, 2025
1 parent c3451df commit 9d778a8
Showing 1 changed file with 95 additions and 92 deletions.
187 changes: 95 additions & 92 deletions arrow-tools/vcf2arrow.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ static const char **user_metadata_values = NULL;
static SQLtable *vcf_table = NULL;
static bool force_write_if_exists = false;
static bool verbose_mode = false;
static bool by_raw_format = false;
static bool sort_by_pos = false;
static bool shows_progress = false;

Expand All @@ -56,9 +57,8 @@ static int variantNames_nrooms = 0;
#define VCF_ARROW_ANUM__QUAL 5
#define VCF_ARROW_ANUM__FILTER 6
#define VCF_ARROW_ANUM__INFO 7
#define VCF_ARROW_ANUM__FORMAT 8
#define VCF_ARROW_ANUM__VARIANT 9
#define VCF_ARROW_NUM_FIELDS (VCF_ARROW_ANUM__VARIANT+1)
#define VCF_ARROW_ANUM__FORMAT 8 /* only if --with-raw-format */
#define VCF_ARROW_NUM_BASE_FIELDS 8

static struct {
ArrowTypeTag type;
Expand All @@ -74,7 +74,6 @@ static struct {
{ArrowNodeTag__Utf8, "filter", VCF_ARROW_ANUM__FILTER},
{ArrowNodeTag__Utf8, "info", VCF_ARROW_ANUM__INFO},
{ArrowNodeTag__Utf8, "format", VCF_ARROW_ANUM__FORMAT},
{ArrowNodeTag__Utf8, "variant_value", VCF_ARROW_ANUM__VARIANT},
{-1,NULL},
};

Expand Down Expand Up @@ -669,16 +668,18 @@ static SQLtable *
__build_vcf_table_buffer(void)
{
SQLtable *table;
int nfields;

/*
* For the multi-variable VCF files, we have secondary (or more) buffer
* after the vcf_table->columns[10], and switch them when we write out
* buffered values to Apache Arrow.
*/
assert(variantNames_nitems > 0);
table = palloc0(offsetof(SQLtable, columns[VCF_ARROW_ANUM__VARIANT +
variantNames_nitems]));
for (int j=0; j < VCF_ARROW_NUM_FIELDS; j++)
nfields = VCF_ARROW_NUM_BASE_FIELDS;
if (by_raw_format)
nfields++;
table = palloc0(offsetof(SQLtable, columns[nfields + variantNames_nitems]));
for (int j=0; j < nfields; j++)
{
const char *arrow_name = vcf_arrow_column_defs[j].name;
ArrowTypeTag arrow_type = vcf_arrow_column_defs[j].type;
Expand All @@ -702,15 +703,25 @@ __build_vcf_table_buffer(void)
}
}
/* extend the "variant" field if multiple fields */
for (int j=1; j < variantNames_nitems; j++)
for (int j=0; j < variantNames_nitems; j++)
{
SQLfield *column = &table->columns[VCF_ARROW_ANUM__VARIANT+j];
const char *name = vcf_arrow_column_defs[VCF_ARROW_ANUM__VARIANT].name;
SQLfield *column = &table->columns[nfields+j];
const char *name = variantNames[j];
ArrowKeyValue *kv;

__setup_vcf_column_utf8_buffer(table, column, name);
table->numBuffers +=
__setup_vcf_column_utf8_buffer(table, column, name);
kv = palloc0(sizeof(ArrowKeyValue));
initArrowNode(kv, KeyValue);
kv->key = "variant_name";
kv->value = name;
kv->_key_len = strlen(kv->key);
kv->_value_len = strlen(kv->value);
column->customMetadata = kv;
column->numCustomMetadata = 1;
}
table->nfields = VCF_ARROW_NUM_FIELDS;
table->numFieldNodes = VCF_ARROW_NUM_FIELDS; /* no nested fields */
table->nfields = nfields + variantNames_nitems;
table->numFieldNodes = nfields + variantNames_nitems;
table->has_statistics = true;
/* custom-metadata */
table->customMetadata = customMetadata;
Expand Down Expand Up @@ -785,45 +796,14 @@ static void setup_vcf_table_buffer(void)
* __flush_vcf_arrow_file
*/
static void
__flush_vcf_arrow_file(SQLtable *table, StringInfo buf)
__flush_vcf_arrow_file(SQLtable *table)
{
SQLfield variant_column_saved;
SQLstat stat_datum_saved;

if (table->nitems == 0)
return; /* skip */
ArrowBlock __block;
int __rb_index;

for (int k=0; k < variantNames_nitems; k++)
if (table->nitems > 0)
{
ArrowBlock __block;
int __rb_index;

if (k == 0)
{
/*
* NOTE: ad-hoc hack - writeArrowRecordBatch clears the stat_datum,
* however, it must be reused for each variants in vcf2arrow.
*/
memcpy(&stat_datum_saved,
&table->columns[VCF_ARROW_ANUM__POS].stat_datum,
sizeof(SQLstat));
__rb_index = writeArrowRecordBatch(table, &__block);
memcpy(&variant_column_saved,
&table->columns[VCF_ARROW_ANUM__VARIANT], sizeof(SQLfield));
}
else
{
memcpy(&table->columns[VCF_ARROW_ANUM__POS].stat_datum,
&stat_datum_saved,
sizeof(SQLstat));
memcpy(&table->columns[VCF_ARROW_ANUM__VARIANT],
&table->columns[VCF_ARROW_ANUM__VARIANT+k], sizeof(SQLfield));
__rb_index = writeArrowRecordBatch(table, &__block);
}
if (buf->len != 0)
appendStringInfo(buf, ",");
appendStringInfo(buf, "%s", variantNames[k]);

__rb_index = writeArrowRecordBatch(table, &__block);
if (shows_progress)
{
time_t tv = time(NULL);
Expand All @@ -847,50 +827,85 @@ __flush_vcf_arrow_file(SQLtable *table, StringInfo buf)
table->nitems);
}
}
if (variantNames_nitems > 1)
memcpy(&table->columns[VCF_ARROW_ANUM__VARIANT],
&variant_column_saved, sizeof(SQLfield));
/* clear the buffer */
for (int k=1; k < variantNames_nitems; k++)
sql_field_clear(&vcf_table->columns[VCF_ARROW_ANUM__VARIANT+k]);
sql_table_clear(vcf_table);
}

/*
* __convert_vcf_variant
*/
static size_t
__convert_vcf_variant(SQLfield *column, const char *__format, char *variant)
{
char *format = NULL;
char *buffer = NULL;
char *tok1, *pos1;
char *tok2, *pos2;
size_t off = 0;

if (__format && variant)
{
buffer = alloca(strlen(__format) + strlen(variant) + 512);
format = alloca(strlen(__format) + 100);
strcpy(format, __format);

for (tok1 = strtok_r(format, ":", &pos1),
tok2 = strtok_r(variant, ":", &pos2);
tok1 != NULL && tok2 != NULL;
tok1 = strtok_r(NULL, ":", &pos1),
tok2 = strtok_r(NULL, ":", &pos2))
{
if (off > 0)
buffer[off++] = ':';
off += sprintf(buffer+off, "%s=%s", tok1, tok2);
}
buffer[off++] = '\0';
}
return sql_field_put_value(column, buffer, -1);
}

/*
* convert_vcf_file
*/
static void
convert_vcf_file(const char *fname, FILE *filp, long *p_lineno, StringInfo buf)
convert_vcf_file(const char *fname, FILE *filp, long *p_lineno)
{
char *line = NULL;
size_t bufsz = 0;
ssize_t nbytes;
int vcf_nattrs = VCF_ARROW_ANUM__VARIANT + variantNames_nitems;

while ((nbytes = getline(&line, &bufsz, filp)) > 0)
{
char *tok, *pos;
int anum;
size_t usage_base = 0;
bool flush_buffer = false;
char *tok, *pos;
const char *format = NULL;;
int anum, j=0;
size_t usage = 0;

(*p_lineno)++;
for (tok = __strtok(line, VCF_WHITESPACE, &pos), anum=0;
anum < vcf_nattrs;
j < vcf_table->nfields;
tok = __strtok(NULL, VCF_WHITESPACE, &pos), anum++)
{
SQLfield *column = &vcf_table->columns[anum];
size_t __usage;

__usage = sql_field_put_value(column, tok, -1);
if (anum < VCF_ARROW_ANUM__VARIANT)
usage_base += __usage;
else if (usage_base + __usage >= batch_segment_sz)
flush_buffer = true;
if (anum < VCF_ARROW_NUM_BASE_FIELDS)
{
usage += sql_field_put_value(&vcf_table->columns[j++], tok, -1);
}
else if (by_raw_format)
{
usage += sql_field_put_value(&vcf_table->columns[j++], tok, -1);
}
else if (!format)
{
format = tok;
}
else
{
usage += __convert_vcf_variant(&vcf_table->columns[j++], format, tok);
}
}
vcf_table->nitems++;
if (flush_buffer)
__flush_vcf_arrow_file(vcf_table, buf);
if (usage >= batch_segment_sz)
__flush_vcf_arrow_file(vcf_table);
}
if (line)
free(line);
Expand Down Expand Up @@ -920,6 +935,8 @@ static void usage(const char *format, ...)
" -E|--embedded-headers=HEADERS : comma separated header names list to be embedded.\n"
" (default: 'fileformat,reference,info,filter,format')\n"
" -m|--user-metadata=KEY=VALUE : a custom key-value pair to be embedded\n"
" --raw-format : saves format and variant columns in raw string.\n"
" (*) in default, it transformed to KEY=VALUE form.\n"
" --sort-by-pos : sort by the POS (optimization for min/max stats)\n"
" (*) note that this option preload entire VCF file once.\n"
" --progress : shows progress of VCF conversion.\n"
Expand All @@ -940,6 +957,7 @@ static void parse_options(int argc, char * const argv[])
{"segment-sz", required_argument, NULL, 's'},
{"embedded-headers", required_argument, NULL, 'E'},
{"user-metadata", required_argument, NULL, 'm'},
{"raw-format", no_argument, NULL, 1001},
{"sort-by-pos", no_argument, NULL, 1002},
{"progress", no_argument, NULL, 1003},
{"help", no_argument, NULL, 'h'},
Expand Down Expand Up @@ -1044,6 +1062,9 @@ static void parse_options(int argc, char * const argv[])
user_metadata_nitems++;
}
break;
case 1001: /* --raw-format */
by_raw_format = true;
break;
case 1002: /* --sort-by-pos */
sort_by_pos = true;
break;
Expand Down Expand Up @@ -1113,31 +1134,13 @@ static void parse_options(int argc, char * const argv[])
*/
int main(int argc, char * const argv[])
{
StringInfoData buf;

parse_options(argc, argv);
setup_vcf_table_buffer();
initStringInfo(&buf); /* metadata for each variant-key */
for (int i=0; i < input_file_nitems; i++)
convert_vcf_file(input_filename[i],
input_filedesc[i],
&input_fileline[i],
&buf);
__flush_vcf_arrow_file(vcf_table, &buf);
if (vcf_table->numRecordBatches > 0)
{
ArrowKeyValue *kv;

vcf_table->customMetadata = repalloc(vcf_table->customMetadata,
sizeof(ArrowKeyValue) *
(vcf_table->numCustomMetadata + 1));
kv = &vcf_table->customMetadata[vcf_table->numCustomMetadata++];
initArrowNode(kv, KeyValue);
kv->key = "variant_key";
kv->value = buf.data;
kv->_key_len = strlen(kv->key);
kv->_value_len = strlen(kv->value);
}
&input_fileline[i]);
__flush_vcf_arrow_file(vcf_table);
writeArrowFooter(vcf_table);
return 0;
}
Expand Down

0 comments on commit 9d778a8

Please sign in to comment.