Skip to content

Commit

Permalink
Attribution support (#418)
Browse files Browse the repository at this point in the history
* Support attribution

* Feature: enable negative example auto-generate

* Fix unexpected converion of bool in argparse

* Bugfix: meta.example_ids maybe dup

* Code format

* Add log

* Pass the neg generating flag

* Pass the neg generating flag

Co-authored-by: duanbing.0 <[email protected]>
  • Loading branch information
duanbing and duanbing.0 authored Nov 26, 2020
1 parent f44e2e7 commit c10573b
Show file tree
Hide file tree
Showing 13 changed files with 1,087 additions and 8 deletions.
6 changes: 4 additions & 2 deletions deploy/scripts/data_join/run_data_join_worker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ example_id_dump_threshold=$(normalize_env_to_args "--example_id_dump_threshold"
data_block_builder=$(normalize_env_to_args "--data_block_builder" $DATA_BLOCK_BUILDER)
data_block_compressed_type=$(normalize_env_to_args "--data_block_compressed_type" $DATA_BLOCK_COMPRESSED_TYPE)
kvstore_type=$(normalize_env_to_args '--kvstore_type' $KVSTORE_TYPE)

max_conversion_delay=$(normalize_env_to_args '--max_conversion_delay' $MAX_CONVERSION_DELAY)
enable_negative_example_generator=$(normalize_env_to_args '--enable_negative_example_generator' $ENABLE_NEGATIVE_EXAMPLE_GENERATOR)
python -m fedlearner.data_join.cmd.data_join_worker_service \
$PEER_ADDR \
$MASTER_POD_NAMES \
Expand All @@ -47,4 +48,5 @@ python -m fedlearner.data_join.cmd.data_join_worker_service \
$data_block_dump_interval $data_block_dump_threshold \
$example_id_dump_interval $example_id_dump_threshold \
$data_block_builder $data_block_compressed_type \
$kvstore_type
$kvstore_type $max_conversion_delay \
$enable_negative_example_generator
26 changes: 26 additions & 0 deletions fedlearner/common/argparse_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright 2020 The FedLearner Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# coding: utf-8

import argparse

def str_as_bool(v):
if isinstance(v, bool):
return v
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
if v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
raise argparse.ArgumentTypeError('Boolean value expected.')
14 changes: 14 additions & 0 deletions fedlearner/data_join/cmd/data_join_worker_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import tensorflow

from fedlearner.common import data_join_service_pb2 as dj_pb
from fedlearner.common.argparse_util import str_as_bool
from fedlearner.data_join.common import get_kvstore_config
from fedlearner.data_join.data_join_worker import DataJoinWorkerService
from fedlearner.data_join.common import interval_to_timestamp
tensorflow.compat.v1.enable_eager_execution()

if __name__ == "__main__":
Expand Down Expand Up @@ -76,6 +78,14 @@
parser.add_argument('--data_block_compressed_type', type=str, default='',
choices=['', 'ZLIB', 'GZIP'],
help='the compressed type for data block')
parser.add_argument('--max_conversion_delay', type=str, default="7D",
help='the max delay of an impression occurred '\
'before a conversion as an attribution pair, unit: '\
'{Y|M|D|H|N|S}, i.e. 1N20S equals 80 seconds')
parser.add_argument('--enable_negative_example_generator', type=str_as_bool,
default=False, const=True, nargs='?',
help="enable the negative example auto-generator, "\
"filled with label: 0")
args = parser.parse_args()
worker_options = dj_pb.DataJoinWorkerOptions(
use_mock_etcd=(args.kvstore_type == 'mock'),
Expand All @@ -91,6 +101,10 @@
max_matching_window=args.max_matching_window,
data_block_dump_interval=args.data_block_dump_interval,
data_block_dump_threshold=args.data_block_dump_threshold,
max_conversion_delay=interval_to_timestamp(\
args.max_conversion_delay),
enable_negative_example_generator=\
args.enable_negative_example_generator,
),
example_id_dump_options=dj_pb.ExampleIdDumpOptions(
example_id_dump_interval=args.example_id_dump_interval,
Expand Down
26 changes: 26 additions & 0 deletions fedlearner/data_join/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,3 +371,29 @@ def get_kvstore_config(kvstore_type):
addr = os.environ.get('ETCD_ADDR', 'localhost:2379')
base_dir = os.environ.get('ETCD_BASE_DIR', 'fedlearner')
return name, addr, None, None, base_dir

def interval_to_timestamp(itv):
unit = ["Y", "M", "D", "H", "N", "S"]
multiple = [3600*24*30*12, 3600*24*30, 3600*24, 3600, 60, 1]
unit_order, unit_no = {}, {}
for i, item in enumerate(unit):
unit_order[item] = len(unit) - i
s_no = ""
prv_order = len(unit) + 1
for c in itv:
if c.isdigit():
s_no += c
else:
c = c.upper()
if c not in unit_order or prv_order <= unit_order[c]:
return None
unit_no[c] = s_no
prv_order = unit_order[c]
s_no = ""
tmstmp = 0
if len(s_no) > 0 and "S" not in unit_no:
unit_no["S"] = s_no
for i, item in enumerate(unit):
if item in unit_no:
tmstmp += int(unit_no[item]) * multiple[i]
return tmstmp
4 changes: 3 additions & 1 deletion fedlearner/data_join/data_block_dumper.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ def _dump_data_block_by_meta(self, meta):
example_num = len(meta.example_ids)
for (index, item) in self._raw_data_visitor:
example_id = item.example_id
if example_id == meta.example_ids[match_index]:
# ELements in meta.example_ids maybe duplicated
while match_index < example_num and\
example_id == meta.example_ids[match_index]:
data_block_builder.write_item(item)
match_index += 1
if match_index >= example_num:
Expand Down
19 changes: 14 additions & 5 deletions fedlearner/data_join/data_block_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ def init_by_meta(self, meta):
def set_data_block_manager(self, data_block_manager):
self._data_block_manager = data_block_manager

def append_item(self, item, leader_index, follower_index, event_time=None):
def append_item(self, item, leader_index, follower_index, event_time=None,\
allow_dup=False):
example_id = item.example_id
if event_time is None:
event_time = item.event_time
Expand All @@ -75,10 +76,18 @@ def append_item(self, item, leader_index, follower_index, event_time=None):
self._data_block_meta.start_time = event_time
self._data_block_meta.end_time = event_time
else:
assert self._data_block_meta.leader_start_index < leader_index, \
"leader start index should be incremental"
assert self._data_block_meta.leader_end_index < leader_index, \
"leader end index should be incremental"
if not allow_dup:
assert self._data_block_meta.leader_start_index < leader_index,\
"leader start index should be incremental"
assert self._data_block_meta.leader_end_index < leader_index, \
"leader end index should be incremental"
else:
assert self._data_block_meta.leader_start_index <= \
leader_index,\
"leader start index should be incremental by GE"
assert self._data_block_meta.leader_end_index <= leader_index, \
"leader end index should be incremental by LE"

self._data_block_meta.leader_end_index = leader_index
if event_time < self._data_block_meta.start_time:
self._data_block_meta.start_time = event_time
Expand Down
Loading

0 comments on commit c10573b

Please sign in to comment.