Skip to content

Commit

Permalink
v2.1.15 dataflows>=0.0.65 with deduplicate processor
Browse files Browse the repository at this point in the history
  • Loading branch information
akariv committed Dec 26, 2019
1 parent c218677 commit 054e72b
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 7 deletions.
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,27 @@ Filtering just American and European countries, leaving out countries whose main
sort-by: "{country_name}"
```

### ***`deduplicate`***

Deduplicates rows in resources based on the resources' primary key

`deduplicate` accepts a resource specifier - for each resource, it will output only unique rows (based on the values in the primary key fields). Rows with duplicate primary keys will be ignored.

_Parameters_:

- `resources` - Which resources to sort. Same semantics as `resources` in `stream_remote_resources`.

*Examples*:

Deduplicating rows in the `world-population` resource.

```yaml
- run: deduplicate
parameters:
resources: world_population
```


### ***`duplicate`***

Duplicate a resource.
Expand Down
2 changes: 1 addition & 1 deletion datapackage_pipelines/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.1.14
2.1.15
16 changes: 16 additions & 0 deletions datapackage_pipelines/lib/deduplicate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from dataflows import Flow, deduplicate
from datapackage_pipelines.wrapper import ingest
from datapackage_pipelines.utilities.flow_utils import spew_flow


def flow(parameters):
return Flow(
deduplicate(
resources=parameters.get('resources'),
)
)


if __name__ == '__main__':
with ingest() as ctx:
spew_flow(flow(ctx.parameters), ctx)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def read(*paths):
'cachetools',
'tabulator>=1.17.0',
'globster>=0.1.0',
'dataflows>=0.0.57',
'dataflows>=0.0.65',
'python-dateutil<2.8.1',
]
SPEEDUP_REQUIRES = [
Expand Down
109 changes: 109 additions & 0 deletions tests/stdlib/fixtures/simple_deduplicate
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
deduplicate
--
{
"resources": ["concat-a1", "concat-a2"]
}
--
{
"name": "test",
"resources": [
{
"name": "concat-a1",
"dpp:streaming": true,
"path": "concat-a1.csv",
"schema": { "fields": [
{"name": "a1", "type": "string"},
{"name": "a2", "type": "string"},
{"name": "a3", "type": "string"}
], "primaryKey": ["a1", "a2"]}
},
{
"name": "concat-a2",
"dpp:streaming": true,
"path": "concat-a2.csv",
"schema": { "fields": [
{"name": "a1", "type": "string"},
{"name": "a2", "type": "string"},
{"name": "a3", "type": "string"}
]}
},
{
"name": "concat-c",
"dpp:streaming": true,
"path": "concat-c.csv",
"schema": { "fields": [
{"name": "c1", "type": "string"},
{"name": "c2", "type": "string"},
{"name": "c3", "type": "string"}
]}
}
]
}
--
{"a1":"a1","a2":"a1","a3":"a2"}
{"a1":"a2","a2":"a1","a3":"a1"}
{"a1":"a1","a2":"a1","a3":"a2"}
{"a1":"a2","a2":"a1","a3":"a1"}

{"a1":"a1","a2":"a3","a3":"a2"}
{"a1":"a2","a2":"a3","a3":"a1"}
{"a1":"a3","a2":"a4","a3":"a2"}
{"a1":"a4","a2":"a4","a3":"a1"}

{"c1":"c11","c2":"c21","c3":"c31"}
{"c1":"c12","c2":"c22","c3":"c32"}
{"c1":"c13","c2":"c23","c3":"c33"}
--
{
"name": "test",
"profile": "data-package",
"resources": [
{
"name": "concat-a1",
"dpp:streaming": true,
"path": "concat-a1.csv",
"profile": "data-resource",
"schema": { "fields": [
{"name": "a1", "type": "string"},
{"name": "a2", "type": "string"},
{"name": "a3", "type": "string"}
], "primaryKey": ["a1", "a2"]}
},
{
"name": "concat-a2",
"dpp:streaming": true,
"path": "concat-a2.csv",
"profile": "data-resource",
"schema": { "fields": [
{"name": "a1", "type": "string"},
{"name": "a2", "type": "string"},
{"name": "a3", "type": "string"}
]}
},
{
"name": "concat-c",
"dpp:streaming": true,
"path": "concat-c.csv",
"profile": "data-resource",
"schema": { "fields": [
{"name": "c1", "type": "string"},
{"name": "c2", "type": "string"},
{"name": "c3", "type": "string"}
]}
}
]
}
--
{"a1":"a1","a2":"a1","a3":"a2"}
{"a1":"a2","a2":"a1","a3":"a1"}

{"a1":"a1","a2":"a3","a3":"a2"}
{"a1":"a2","a2":"a3","a3":"a1"}
{"a1":"a3","a2":"a4","a3":"a2"}
{"a1":"a4","a2":"a4","a3":"a1"}

{"c1":"c11","c2":"c21","c3":"c31"}
{"c1":"c12","c2":"c22","c3":"c32"}
{"c1":"c13","c2":"c23","c3":"c33"}

{}
2 changes: 1 addition & 1 deletion tests/stdlib/fixtures/simple_dump_to_zip
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,4 @@ dump_to_zip
--
{"mystring":"a", "myinteger": null, "mynumber": {"type{decimal}": "2.0"}, "mydate": {"type{date}": "2016-12-31"}}

{"bytes": 1074, "count_of_rows": 1, "dataset_name": "test", "hash": "1f3b5abdd2cf11345c8883f1c129eb99"}
{"bytes": 1143, "count_of_rows": 1, "dataset_name": "test", "hash": "c68a5400c197333d75d34f4c198fea0b"}
2 changes: 1 addition & 1 deletion tests/stdlib/fixtures/simple_dump_to_zip_with_hash
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,4 @@ dump_to_zip
--
{"mystring":"a", "myinteger": null, "mynumber": {"type{decimal}": "2.0"}, "mydate": {"type{date}": "2016-12-31"}}

{"bytes": 1074, "count_of_rows": 1, "dataset_name": "test", "hash": "1f3b5abdd2cf11345c8883f1c129eb99"}
{"bytes": 1143, "count_of_rows": 1, "dataset_name": "test", "hash": "c68a5400c197333d75d34f4c198fea0b"}
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,4 @@ dump_to_zip
--
{"mystring":"a", "myinteger": null, "mynumber": {"type{decimal}": "2.0"}, "mydate": {"type{date}": "2016-12-31"}}

{"bytes": 1074, "count_of_rows": 1, "dataset_name": "test", "hash": "1f3b5abdd2cf11345c8883f1c129eb99"}
{"bytes": 1143, "count_of_rows": 1, "dataset_name": "test", "hash": "c68a5400c197333d75d34f4c198fea0b"}
4 changes: 2 additions & 2 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def test_pipeline():
{"pipeline_id": "./tests/env/dummy/pipeline-test-hooks", "event": "start"},
{"pipeline_id": "./tests/env/dummy/pipeline-test-hooks", "event": "finish", "success": True,
'stats': {'.dpp': {'out-datapackage-url': 'hooks-outputs/datapackage.json'},
'bytes': 15715, 'count_of_rows': 40,
'dataset_name': 'hook-tests', 'hash': 'aa1687f3c157e5bd733d102de0e80d29'}}
'bytes': 15787, 'count_of_rows': 40,
'dataset_name': 'hook-tests', 'hash': '9fc202087094c7becf98228a1327b21c'}}
]
assert progresses >= 1

0 comments on commit 054e72b

Please sign in to comment.