Skip to content

Commit

Permalink
vcf2arrow: add vcf_variant_getattr() and vcf_info_getattr()
Browse files Browse the repository at this point in the history
  • Loading branch information
kaigai committed Jan 26, 2025
1 parent 9b53d6e commit 58d07c4
Show file tree
Hide file tree
Showing 6 changed files with 266 additions and 0 deletions.
105 changes: 105 additions & 0 deletions src/misc.c
Original file line number Diff line number Diff line change
Expand Up @@ -1964,6 +1964,111 @@ pgstrom_abort_if(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}

/*
* pgstrom_fetch_token_by_(colon|semicolon|comma)
*/
static text *
__fetch_token_by_delim(text *__str, text *__key, char delim)
{
const char *str = VARDATA_ANY(__str);
const char *key = VARDATA_ANY(__key);
size_t strlen = VARSIZE_ANY_EXHDR(__str);
size_t keylen = VARSIZE_ANY_EXHDR(__key);
const char *end, *pos, *base;

/*
* triming whitespaces of the key head/tail
*/
while (keylen > 0 && isspace(*key))
{
key++;
keylen--;
}
if (keylen == 0)
return NULL;
while (keylen > 0 && isspace(key[keylen-1]))
keylen--;
if (keylen == 0)
return NULL;
/*
* split a token by the delimiter for each
*/
if (strlen == 0)
return NULL;
end = str + strlen - 1;
pos = base = str;
while (pos <= end)
{
if (*pos == delim || pos == end)
{
if (pos - base >= keylen && strncmp(base, key, keylen) == 0)
{
const char *__k = (base + keylen);

while (isspace(*__k) && __k < pos)
__k++;
if (__k < pos && *__k == '=')
{
size_t len = (pos - __k) - 1;
text *t = palloc(VARHDRSZ + len + 1);

if (len > 0)
memcpy(t->vl_dat, __k+1, len);
t->vl_dat[len] = '\0';
SET_VARSIZE(t, VARHDRSZ + len);
return t;
}
}
base = pos + 1;
}
else if (pos == base && isspace(*pos))
{
base++;
}
pos++;
}
return NULL;
}

PG_FUNCTION_INFO_V1(pgstrom_fetch_token_by_colon);
PUBLIC_FUNCTION(Datum)
pgstrom_fetch_token_by_colon(PG_FUNCTION_ARGS)
{
text *str = PG_GETARG_TEXT_PP(0);
text *key = PG_GETARG_TEXT_PP(1);
text *result = __fetch_token_by_delim(str, key, ':');

if (!result)
PG_RETURN_NULL();
PG_RETURN_POINTER(result);
}

PG_FUNCTION_INFO_V1(pgstrom_fetch_token_by_semicolon);
PUBLIC_FUNCTION(Datum)
pgstrom_fetch_token_by_semicolon(PG_FUNCTION_ARGS)
{
text *str = PG_GETARG_TEXT_PP(0);
text *key = PG_GETARG_TEXT_PP(1);
text *result = __fetch_token_by_delim(str, key, ';');

if (!result)
PG_RETURN_NULL();
PG_RETURN_POINTER(result);
}

PG_FUNCTION_INFO_V1(pgstrom_fetch_token_by_comma);
PUBLIC_FUNCTION(Datum)
pgstrom_fetch_token_by_comma(PG_FUNCTION_ARGS)
{
text *str = PG_GETARG_TEXT_PP(0);
text *key = PG_GETARG_TEXT_PP(1);
text *result = __fetch_token_by_delim(str, key, ',');

if (!result)
PG_RETURN_NULL();
PG_RETURN_POINTER(result);
}

/*
* Simple wrapper for read(2) and write(2) to ensure full-buffer read and
* write, regardless of i/o-size and signal interrupts.
Expand Down
15 changes: 15 additions & 0 deletions src/sql/pg_strom--5.1--5.3.sql
Original file line number Diff line number Diff line change
Expand Up @@ -416,3 +416,18 @@ CREATE FUNCTION pgstrom.fregr_syy(bytea)
RETURNS float8
AS 'MODULE_PATHNAME','pgstrom_regr_syy_final'
LANGUAGE C STRICT PARALLEL SAFE;

--- ================================================================
---
--- utility function related to vcf2arrow
---
--- ================================================================
CREATE FUNCTION public.vcf_variant_getattr(text,text)
RETURNS text
AS 'MODULE_PATHNAME','pgstrom_fetch_token_by_colon'
LANGUAGE C STRICT PARALLEL SAFE;

CREATE FUNCTION public.vcf_info_getattr(text,text)
RETURNS text
AS 'MODULE_PATHNAME','pgstrom_fetch_token_by_semicolon'
LANGUAGE C STRICT PARALLEL SAFE;
17 changes: 17 additions & 0 deletions src/xpu_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,23 @@ __strcmp(const char *s1, const char *s2)
return c1 - c2;
}

INLINE_FUNCTION(int)
__strncmp(const char *s1, const char *s2, int n)
{
unsigned char c1, c2;

while (n > 0)
{
c1 = (unsigned char) *s1++;
c2 = (unsigned char) *s2++;

if (c1 == '\0' || c1 != c2)
return c1 - c2;
n--;
}
return 0;
}

/* ----------------------------------------------------------------
*
* Fundamental CUDA definitions
Expand Down
4 changes: 4 additions & 0 deletions src/xpu_opcodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,10 @@ __FUNC_OPCODE(cube_contains, cube/cube, 10, "cube")
__FUNC_OPCODE(cube_contained, cube/cube, 10, "cube")
__FUNC_OPCODE(cube_ll_coord, cube/int4, 10, "cube")

/* vcf2arrow */
__FUNC_OPCODE(vcf_variant_getattr, text/text, 10, "pg_strom")
__FUNC_OPCODE(vcf_info_getattr, text/text, 10, "pg_strom")

#undef TYPE_OPCODE
#undef TYPE_ALIAS
#undef FUNC_OPCODE
Expand Down
104 changes: 104 additions & 0 deletions src/xpu_textlib.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1317,3 +1317,107 @@ pgfn_substr_nolen(XPU_PGFUNCTION_ARGS)
{
return pgfn_substring_nolen(kcxt, kexp, __result);
}

/*
* Functions related to vcf2arrow
*/
STATIC_FUNCTION(void)
__fetch_token_by_delim(kern_context *kcxt,
xpu_text_t *result,
const char *str, int strlen,
const char *key, int keylen, char delim)
{
const char *end, *pos, *base;

/*
* triming whitespaces of the key head/tail
*/
while (keylen > 0 && __isspace(*key))
{
key++;
keylen--;
}
if (keylen == 0)
goto out;
while (keylen > 0 && __isspace(key[keylen-1]))
keylen--;
if (keylen == 0)
goto out;
/*
* split a token by the delimiter for each
*/
if (strlen == 0)
goto out;
end = str + strlen - 1;
pos = base = str;
while (pos <= end)
{
if (*pos == delim || pos == end)
{
if (pos - base >= keylen && __strncmp(base, key, keylen) == 0)
{
const char *__k = (base + keylen);

while (__isspace(*__k) && __k < pos)
__k++;
if (__k < pos && *__k == '=')
{
result->expr_ops = &xpu_text_ops;
result->value = ++__k;
result->length = (pos - __k);
return;
}
}
base = pos + 1;
}
else if (pos == base && __isspace(*pos))
{
base++;
}
pos++;
}
out:
result->expr_ops = NULL;
}

PUBLIC_FUNCTION(bool)
pgfn_vcf_variant_getattr(XPU_PGFUNCTION_ARGS)
{
KEXP_PROCESS_ARGS2(text, text, str, text, key);

if (XPU_DATUM_ISNULL(&str) || XPU_DATUM_ISNULL(&key))
result->expr_ops = NULL;
else if (!xpu_text_is_valid(kcxt, &str) ||
!xpu_text_is_valid(kcxt, &key))
return false; /* compressed or external */
else
__fetch_token_by_delim(kcxt,
result,
str.value,
str.length,
key.value,
key.length,
':');
return true;
}

PUBLIC_FUNCTION(bool)
pgfn_vcf_info_getattr(XPU_PGFUNCTION_ARGS)
{
KEXP_PROCESS_ARGS2(text, text, str, text, key);

if (XPU_DATUM_ISNULL(&str) || XPU_DATUM_ISNULL(&key))
result->expr_ops = NULL;
else if (!xpu_text_is_valid(kcxt, &str) ||
!xpu_text_is_valid(kcxt, &key))
return false; /* compressed or external */
else
__fetch_token_by_delim(kcxt,
result,
str.value,
str.length,
key.value,
key.length,
';');
return true;
}
21 changes: 21 additions & 0 deletions src/xpu_textlib.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,25 @@ xpu_bytea_is_valid(kern_context *kcxt, const xpu_bytea_t *arg)
return true;
}

/*
* functions in ctype.h
*/
INLINE_FUNCTION(bool)
__isspace(int c)
{
return (c == ' ' || c == '\t' || c == '\n' || c == '\r');
}

INLINE_FUNCTION(bool)
__isupper(int c)
{
return (c >= 'A' && c <= 'Z');
}

INLINE_FUNCTION(bool)
__islower(int c)
{
return (c >= 'a' && c <= 'z');
}

#endif /* XPU_TEXTLIB_H */

0 comments on commit 58d07c4

Please sign in to comment.