Skip to content

Commit

Permalink
[query][rfc-0008] permit more array INFO fields with missing elements
Browse files Browse the repository at this point in the history
CHANGELOG: Fixed hail-is#13346. Previously, when parsing VCFs, Hail failed on INFO fields with missing elements because the meaning of "." could be ambiguous. Hail now resovles the ambiguity, when possible, using the number of alleles. If the meaning is still ambiguous after considering the number of alleles, Hail uses a new `hl.import_vcf` parameter to resolve the ambiguity. See the `hl.import_vcf` docs for details.

See hail-is/hail-rfcs#8 for details on the problem and the solution.

I assessed the effect of removing the `array_elements_required=True` fast path by evaluating the
following code against this PR's tip commit `cd06c248e4` and `0.2.120` (`f00f916faf`). I ran it
three times per commit and report each individual time as well as the average.

```
In [1]: import hail as hl

In [2]: %%time
   ...: mt = hl.import_vcf(
   ...:     '/Users/dking/projects/hail-data/ALL.chr21.raw.HC.vcf.bgz'
   ...: )
   ...: mt._force_count_rows()
```

| commit                   | run 1 (s) | run 2 (s) | run 3 (s) | average (s) | warm average (s) |
|--------------------------|-----------|-----------|-----------|-------------|------------------|
| `cd06c248e4` (this PR)   | 116s      | 80s       | 77s       | 91+-18      | 78.5 +- 1.5      |
| `f00f916faf` (`0.2.120`) | 112s      | 80s       | 79s       | 90+-15      | 79.5 +- 0.5      |

This is what I expected. For a VCF with no ambiguity and few instances of ".", we've added a very
minor amount of new work.
  • Loading branch information
Dan King committed Aug 21, 2023
1 parent 14d6f9a commit ec88866
Show file tree
Hide file tree
Showing 5 changed files with 302 additions and 19 deletions.
152 changes: 135 additions & 17 deletions hail/python/hail/methods/impex.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import Dict, Callable, Optional
import os
import re
from collections import defaultdict
Expand All @@ -8,18 +9,18 @@

import hail as hl
from hail import ir
from hail.expr import StructExpression, LocusExpression, \
expr_array, expr_float64, expr_str, expr_numeric, expr_call, expr_bool, \
expr_int32, to_expr, analyze
from hail.expr import (StructExpression, LocusExpression, expr_array, expr_float64, expr_str,
expr_numeric, expr_call, expr_bool, expr_int32, to_expr, analyze, Expression,
DictExpression, expr_any)
from hail.expr.types import hail_type, tarray, tfloat64, tstr, tint32, tstruct, \
tcall, tbool, tint64, tfloat32
from hail.genetics.reference_genome import reference_genome_type
from hail.ir.utils import parse_type
from hail.matrixtable import MatrixTable
from hail.methods.misc import require_biallelic, require_row_key_variant, require_col_key_str
from hail.table import Table
from hail.typecheck import typecheck, nullable, oneof, dictof, anytype, \
sequenceof, enumeration, sized_tupleof, numeric, table_key_type, char
from hail.typecheck import (typecheck, nullable, oneof, dictof, anytype, sequenceof, enumeration,
sized_tupleof, numeric, table_key_type, char, func_spec)
from hail.utils import new_temp_file
from hail.utils.deduplicate import deduplicate
from hail.utils.java import Env, FatalError, jindexed_seq_args, warning
Expand Down Expand Up @@ -2628,6 +2629,9 @@ def get_vcf_metadata(path):
return Env.backend().parse_vcf_metadata(path)



array_elements_required_nonce = object()

@typecheck(path=oneof(str, sequenceof(str)),
force=bool,
force_bgz=bool,
Expand All @@ -2637,15 +2641,16 @@ def get_vcf_metadata(path):
call_fields=oneof(str, sequenceof(str)),
reference_genome=nullable(reference_genome_type),
contig_recoding=nullable(dictof(str, str)),
array_elements_required=bool,
array_elements_required=oneof(bool, enumeration(array_elements_required_nonce)),
skip_invalid_loci=bool,
entry_float_type=enumeration(tfloat32, tfloat64),
filter=nullable(str),
find_replace=nullable(sized_tupleof(str, str)),
n_partitions=nullable(int),
block_size=nullable(int),
_create_row_uids=bool,
_create_col_uids=bool)
_create_col_uids=bool,
disambiguate_single_dot=nullable(dictof(str, func_spec(1, expr_any))))
def import_vcf(path,
force=False,
force_bgz=False,
Expand All @@ -2655,14 +2660,17 @@ def import_vcf(path,
call_fields=['PGT'],
reference_genome='default',
contig_recoding=None,
array_elements_required=True,
array_elements_required=array_elements_required_nonce,
skip_invalid_loci=False,
entry_float_type=tfloat64,
filter=None,
find_replace=None,
n_partitions=None,
block_size=None,
_create_row_uids=False, _create_col_uids=False,
*,
_create_row_uids=False,
_create_col_uids=False,
disambiguate_single_dot: Optional[Dict[str, Callable[[Expression], Expression]]] = None,
) -> MatrixTable:
"""Import VCF file(s) as a :class:`.MatrixTable`.
Expand Down Expand Up @@ -2779,12 +2787,11 @@ def import_vcf(path,
All contigs must be present in the `reference_genome`, so this is
useful for mapping differently-formatted data onto known references.
array_elements_required : :obj:`bool`
If ``True``, all elements in an array field must be present. Set this
parameter to ``False`` for Hail to allow array fields with missing
values such as ``1,.,5``. In this case, the second element will be
missing. However, in the case of a single missing element ``.``, the
entire field will be missing and **not** an array with one missing
element.
DEPRECATED: use `disambiguate_single_dot` instead. If ``True``, all elements in an array
field must be present. Set this parameter to ``False`` for Hail to allow array fields with
missing values such as ``1,.,5``. In this case, the second element will be missing. However,
in the case of a single missing element ``.``, the entire field will be missing and **not**
an array with one missing element.
skip_invalid_loci : :obj:`bool`
If ``True``, skip loci that are not consistent with `reference_genome`.
entry_float_type: :class:`.HailType`
Expand All @@ -2803,23 +2810,134 @@ def import_vcf(path,
are specified, `n_partitions` will be used.
block_size : :obj:`int`, optional
Block size, in MB. Default: 128MB blocks.
disambiguate_single_dot : :obj:`dict` of (:class:`str`, :class:`Callable`[[:class:`Expression`], :class:`Expression]), optional
User-specified expressions to interpret the meaning of an INFO or FORMAT array-field whose
value is ".". If specified, a dictionary mapping from INFO and/or FORMAT fields to
functions. Whenever a "." is encountered, the function for that field is called with the
current row as an argument. The returned value is used as the value of the field. If there
is no value for that field, an error is raised.
Returns
-------
:class:`.MatrixTable`
"""
if force:
hl.utils.warning(
warning(
f'You are trying to read {path} with *ONE* core of parallelism. This '
'will be very slow. If this file is block-gzipped (bgzip-ed), use '
'force_bgz=True instead.'
)

if array_elements_required is array_elements_required_nonce:
array_elements_required = False
else:
if array_elements_required is False:
warning(
f'"array_elements_required=False is deprecated. Please use "disambiguate_single_dot" instead.'
)

if disambiguate_single_dot is not None:
raise ValueError(
'Do not provide "array_elements_required" when providing '
'"disambiguate_single_dot". Provide only "disambiguate_single_dot".'
)

disambiguate_single_dot = disambiguate_single_dot or {}

reader = ir.MatrixVCFReader(path, call_fields, entry_float_type, header_file,
n_partitions, block_size, min_partitions,
reference_genome, contig_recoding, array_elements_required,
skip_invalid_loci, force_bgz, force, filter, find_replace)
return MatrixTable(ir.MatrixRead(reader, drop_cols=drop_samples, drop_row_uids=not _create_row_uids, drop_col_uids=not _create_col_uids))
mt = MatrixTable(
ir.MatrixRead(
reader,
drop_cols=drop_samples,
drop_row_uids=not _create_row_uids,
drop_col_uids=not _create_col_uids
)
)

if array_elements_required is True:
return mt.select_globals()

def disambiguate_field(field: str, value: Expression, attributes: DictExpression) -> Expression:
disambiguator = disambiguate_single_dot.get(field)

if field in {'AF', 'CIGAR', 'EC'}:
# VCFs may incorrectly encode these as ".". The VCF spec requires A.
field_number = 'A'
elif field in {'AD', 'ADF', 'ADR'}:
# VCFs may incorrectly encode these as ".". The VCF spec requires R.
field_number = 'R'
elif field in {'GL', 'GP', 'PL', 'PP'}:
# VCFs may incorrectly encode these as ".". The VCF spec requires G.
field_number = 'G'
else:
field_number = attributes[field]['Number']

if not isinstance(value.dtype, hl.tarray):
if disambiguator is not None:
raise ValueError(
f'Do not provide a disambiguator for the non-array INFO field {field}. It cannot be ambiguous.'
)
return value

isA = field_number == 'A'
isR = field_number == 'R'
isG = field_number == 'G'
isDot = field_number == '.'
n_alts = hl.len(mt.alleles) - 1
is_ambiguous = hl.any(
hl.all(isA, n_alts == 1),
hl.all(isR, n_alts == 0),
hl.all(isG, n_alts == 0),
isDot
)

cb = (
hl.case()
.when(hl.is_defined(value), value)
.when(~is_ambiguous, value)
)

if disambiguator is not None:
disambiguated_value = disambiguator(mt.row)
try:
return cb.default(disambiguated_value)
except TypeError as exc:
raise ValueError(
f'Disambiguator must have same type as field: field {field} had type {value.dtype} but disambiguator had type {disambiguated_value.dtype}.'
) from exc

return cb.or_error(
hl.format(
f'At row with key %s, INFO array field {field} had the value ".". '
f'This value is ambiguous. It could either be a missing array value or '
f'an array with one element which is a missing value. You must use the '
f'"disambiguate_single_dot" parameter of "import_vcf" to specify how to '
f'interpret "." for this field.',
mt.row_key
)
)

mt = mt.annotate_rows(info=hl.Struct(**{
row_field: disambiguate_field(
row_field,
mt.info[row_field],
mt.vcf_header.infoAttrs
)
for row_field in mt.info
}))
mt = mt.annotate_entries(**{
entry_field: disambiguate_field(
entry_field,
mt.entry[entry_field],
mt.vcf_header.formatAttrs
)
for entry_field in mt.entry
})
return mt.select_globals()


@typecheck(path=expr_str,
Expand Down
119 changes: 119 additions & 0 deletions hail/python/test/hail/methods/test_impex.py
Original file line number Diff line number Diff line change
Expand Up @@ -2176,3 +2176,122 @@ def test_import_export_same(i):

assert mt._same(mt2)
assert mt._same(mt3)


def test_import_vcf_13346():
mt = hl.import_vcf(resource(f'issue_13346.vcf'), reference_genome='GRCh38')
actual = mt.rows().collect()[0]
expected = hl.Struct(
locus=hl.Locus('chr1', 10403, reference_genome='GRCh38'),
alleles=['ACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAAC', 'A', 'ACCCCTAACCCTAACCCTAACCCTAACCCTAACCCTAAC'],
rsid=None,
qual=-10.0,
filters={'LowQual'}
,
info=hl.Struct(
AC=[1, 1],
AF=[0.250, 0.250],
AN=4,
AS_QUALapprox='0|23|45',
AS_VQSLOD=[None, None],
AS_YNG=[None, None],
QUALapprox=45,
),
)
assert actual == expected


def test_import_vcf_number0_field_disambiguator_is_error():
with pytest.raises(ValueError, match='Do not provide a disambiguator for the non-array INFO field Number0. It cannot be ambiguous.'):
mt = hl.import_vcf(
resource(f'all_possible_ambiguous_info_array_fields.vcf'),
disambiguate_single_dot={'Number0': lambda x: 'hello'}
)


def test_import_vcf_number1_field_disambiguator_is_error():
with pytest.raises(ValueError, match='Do not provide a disambiguator for the non-array INFO field Number1. It cannot be ambiguous.'):
mt = hl.import_vcf(
resource(f'all_possible_ambiguous_info_array_fields.vcf'),
disambiguate_single_dot={'Number1': lambda x: 'hello'}
)


def test_import_vcf_ambiguous_field_defaults_to_error():
with pytest.raises(HailUserError, match='.*This value is ambiguous. It could either be a missing array value or.*'):
mt = hl.import_vcf(resource(f'all_possible_ambiguous_info_array_fields.vcf'))
print(mt.rows().collect())


def test_import_vcf_disambiguator_changing_type_has_good_error_message():
with pytest.raises(HailUserError, match='Disambiguator must have same type as field: field NumberA had type array<str> but disambiguator had type int32.'):
mt = hl.import_vcf(
resource(f'all_possible_ambiguous_info_array_fields.vcf'),
disambiguate_single_dot={'NumberA': lambda x: 42}
)
print(mt.rows().collect())


def test_import_vcf_disambiguator_is_used_or_ambiguous_cases():
mt = hl.import_vcf(
resource(f'all_possible_ambiguous_info_array_fields.vcf'),
disambiguate_single_dot=dict(
Number2=lambda _: ["Number2 is Ambiguous"],
NumberA=lambda _: ["NumberA is Ambiguous"],
NumberR=lambda _: ["NumberR is Ambiguous"],
NumberG=lambda _: ["NumberG is Ambiguous"],
NumberDot=lambda _: ["NumberDot is Ambiguous"],
)
)
one_alt, two_alts, no_ambiguities = mt.rows().collect()

assert one_alt == hl.Struct(
locus=hl.Locus('1', 1),
alleles=['A', 'T'],
rsid=None,
qual=-10.0,
filters={'LowQual'},
info=hl.Struct(
Number0=True,
Number1=None,
Number2=None,
NumberA=["NumberA is Ambiguous"],
NumberR=None,
NumberG=None,
NumberDot=["NumberDot is Ambiguous"],
),
)

assert two_alts == hl.Struct(
locus=hl.Locus('1', 1),
alleles=['A', 'T', 'G'],
rsid=None,
qual=-10.0,
filters={'LowQual'},
info=hl.Struct(
Number0=True,
Number1=None,
Number2=None,
NumberA=None,
NumberR=None,
NumberG=None,
NumberDot=["NumberDot is Ambiguous"],
),
)

assert no_ambiguities == hl.Struct(
locus=hl.Locus('1', 2),
alleles=['A', 'T', 'G'],
rsid=None,
qual=-10.0,
filters={'LowQual'},
info=hl.Struct(
Number0=True,
Number1='X',
Number2=['X', None],
NumberA=[None, 'X'],
NumberR=[None, 'X', None],
NumberG=[None, 'X', None, 'X', None, 'X'],
NumberDot=[None, 'X', 'X', 'X', None],
),
)
15 changes: 13 additions & 2 deletions hail/src/main/scala/is/hail/io/vcf/LoadVCF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1809,8 +1809,12 @@ class MatrixVCFReader(
IRParser.parseType(params.entryFloatTypeName),
params.callFields)

val globalType = TStruct(
"vcf_header" -> VCFHeaderInfo.headerType.typeAfterSelectNames(Array("formatAttrs", "filterAttrs", "infoAttrs"))
)

def fullMatrixTypeWithoutUIDs: MatrixType = MatrixType(
globalType = TStruct.empty,
globalType = globalType,
colType = TStruct("s" -> TString),
colKey = Array("s"),
rowType = TStruct(
Expand Down Expand Up @@ -1892,7 +1896,14 @@ class MatrixVCFReader(
GenericLines.read(fs, fileStatuses, params.nPartitions, params.blockSizeInMB, params.minPartitions, params.gzAsBGZ, params.forceGZ)
}

val globals = Row(sampleIDs.zipWithIndex.map { case (s, i) => Row(s, i.toLong) }.toFastIndexedSeq)
val globals = Row(
Row(
header.formatAttrs,
header.filtersAttrs,
header.infoAttrs
),
sampleIDs.zipWithIndex.map { case (s, i) => Row(s, i.toLong) }.toFastIndexedSeq
)

val fullRowPType: PType = fullRVDType.rowType

Expand Down
Loading

0 comments on commit ec88866

Please sign in to comment.