Skip to content

Commit

Permalink
Re-fit streaming tests to use the updated mapreduce packages.
Browse files Browse the repository at this point in the history
  • Loading branch information
mpobrien committed Apr 26, 2013
1 parent efb3081 commit 6370312
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@
import com.mongodb.hadoop.io.*;
import com.mongodb.BasicDBObject;

public class MongoInputFormat implements InputFormat<BasicDBObject, BasicDBObject> {
public class MongoInputFormat implements InputFormat<BSONWritable, BSONWritable> {


@SuppressWarnings("deprecation")
public RecordReader<BasicDBObject, BasicDBObject> getRecordReader(InputSplit split,
public RecordReader<BSONWritable, BSONWritable> getRecordReader(InputSplit split,
JobConf job,
Reporter reporter) {
if (!(split instanceof MongoInputSplit))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@


@SuppressWarnings( "deprecation" )
public class MongoRecordReader implements RecordReader<BasicDBObject, BasicDBObject> {
public class MongoRecordReader implements RecordReader<BSONWritable, BSONWritable> {

public MongoRecordReader( MongoInputSplit split ){
_cursor = split.getCursor();
Expand All @@ -38,20 +38,20 @@ public void close(){
_cursor.close();
}

public BasicDBObject createKey(){
return new BasicDBObject();
public BSONWritable createKey(){
return new BSONWritable();
}


public BasicDBObject createValue(){
return new BasicDBObject();
public BSONWritable createValue(){
return new BSONWritable();
}

public BasicDBObject getCurrentKey(){
return new BasicDBObject( "_id", _current.get(_keyField != null ? _keyField : "_id") );
public BSONWritable getCurrentKey(){
return new BSONWritable(new BasicDBObject( "_id", _current.get(_keyField != null ? _keyField : "_id") ));
}

public BasicDBObject getCurrentValue(){
public BSONWritable getCurrentValue(){
return _current;
}

Expand Down Expand Up @@ -82,7 +82,7 @@ public boolean nextKeyValue(){
if ( !_cursor.hasNext() )
return false;

_current = (BasicDBObject)_cursor.next();
_current = new BSONWritable(_cursor.next());
_seen++;

return true;
Expand All @@ -92,7 +92,7 @@ public boolean nextKeyValue(){
}
}

public boolean next( BasicDBObject key, BasicDBObject value ){
public boolean next( BSONWritable key, BSONWritable value ){
if ( nextKeyValue() ){
log.debug( "Had another k/v" );
key.put( "_id", getCurrentKey().get( "_id" ) );
Expand All @@ -107,7 +107,7 @@ public boolean next( BasicDBObject key, BasicDBObject value ){
}

private final DBCursor _cursor;
private BasicDBObject _current;
private BSONWritable _current;
private float _seen = 0;
private float _total;
private String _keyField;
Expand Down
6 changes: 5 additions & 1 deletion streaming/examples/treasury/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
try:
from pymongo_hadoop import BSONMapper
import pymongo_hadoop
print >> sys.stderr, "pymongo_hadoop is not installed or in path - will try to import from source tree."
except:
here = os.path.abspath(__file__)
module_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(here))),
Expand All @@ -17,8 +18,11 @@
from pymongo_hadoop import BSONMapper

def mapper(documents):
print >> sys.stderr, "Running python mapper."

for doc in documents:
yield {'_id': doc['_id'].year, 'bc10Year': doc['bc10Year']}

print >> sys.stderr, "Python mapper finished."

BSONMapper(mapper)
print >> sys.stderr, "Done Mapping."
39 changes: 23 additions & 16 deletions testing/run_treasury.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def compare_results(collection):
STREAMING_JARPATH=os.path.join(MONGO_HADOOP_ROOT,
"streaming",
"target",
"mongo-hadoop-streaming*.jar")
"mongo-hadoop-streaming_cdh4b1-1.1.0-SNAPSHOT.jar")
STREAMING_MAPPERPATH=os.path.join(MONGO_HADOOP_ROOT,
"streaming",
"examples",
Expand Down Expand Up @@ -130,10 +130,11 @@ def compare_results(collection):
}

DEFAULT_OLD_PARAMETERS = DEFAULT_PARAMETERS.copy()
DEFAULT_OLD_PARAMETERS["mongo.job.mapper"] = "com.mongodb.hadoop.examples.treasury.TreasuryYieldMapperV2"
DEFAULT_OLD_PARAMETERS["mongo.job.reducer"] = "com.mongodb.hadoop.examples.treasury.TreasuryYieldReducerV2"
DEFAULT_OLD_PARAMETERS["mongo.job.input.format"] = "com.mongodb.hadoop.mapred.MongoInputFormat"
DEFAULT_OLD_PARAMETERS["mongo.job.output.format"] = "com.mongodb.hadoop.mapred.MongoOutputFormat"
DEFAULT_OLD_PARAMETERS.update(
{ "mongo.job.mapper": "com.mongodb.hadoop.examples.treasury.TreasuryYieldMapperV2",
"mongo.job.reducer": "com.mongodb.hadoop.examples.treasury.TreasuryYieldReducerV2",
"mongo.job.input.format": "com.mongodb.hadoop.mapred.MongoInputFormat",
"mongo.job.output.format": "com.mongodb.hadoop.mapred.MongoOutputFormat"})

def runjob(hostname, params, input_collection='mongo_hadoop.yield_historical.in',
output_collection='mongo_hadoop.yield_historical.out',
Expand Down Expand Up @@ -165,19 +166,24 @@ def runjob(hostname, params, input_collection='mongo_hadoop.yield_historical.in'
subprocess.call(' '.join(cmd), shell=True)

def runstreamingjob(hostname, params, input_collection='mongo_hadoop.yield_historical.in',
output_collection='mongo_hadoop.yield_historical.out', readpref="primary"):
output_collection='mongo_hadoop.yield_historical.out', readpref="primary", input_auth=None, output_auth=None):

cmd = [os.path.join(HADOOP_HOME, "bin", "hadoop")]
cmd.append("jar")
cmd.append(STREAMING_JARPATH)

for key, val in params.items():
cmd.append("-" + key)
cmd.append(val)

cmd.append("-inputURI")
cmd.append("mongodb://%s/%s?readPreference=%s" % (hostname, input_collection, readpref))
cmd.append("-outputURI")
cmd.append("mongodb://%s/%s" % (hostname, output_collection))
cmd.append('$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming*')
cmd += ["-libjars", '$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.0.0-cdh4.2.0.jar,'+STREAMING_JARPATH]
cmd += ["-input", '/tmp/in']
cmd += ["-output", '/tmp/out']
cmd += ["-inputformat", 'com.mongodb.hadoop.mapred.MongoInputFormat']
cmd += ["-outputformat", 'com.mongodb.hadoop.mapred.MongoOutputFormat']
cmd += ["-io", 'mongodb']
input_uri = 'mongodb://%s%s/%s?readPreference=%s' % (input_auth + "@" if input_auth else '', hostname, input_collection, readpref)
cmd += ['-jobconf', "mongo.input.uri=%s" % input_uri]
output_uri = "mongo.output.uri=mongodb://%s%s/%s" % (output_auth + "@" if output_auth else '', hostname, output_collection)
cmd += ['-jobconf', output_uri]
cmd += ['-jobconf', 'stream.io.identifier.resolver.class=com.mongodb.hadoop.streaming.io.MongoIdentifierResolver']
cmd += ['-mapper', params['mapper']]
cmd += ['-reducer', params['reducer']]

print cmd
subprocess.call(' '.join(cmd), shell=True)
Expand Down Expand Up @@ -396,6 +402,7 @@ class TestStreaming(Standalone):
@unittest.skipIf(HADOOP_RELEASE.startswith('1.0') or HADOOP_RELEASE.startswith('0.20'),
'streaming not supported')
def test_treasury(self):
PARAMETERS = DEFAULT_OLD_PARAMETERS.copy()
runstreamingjob(self.server_hostname, {'mapper': STREAMING_MAPPERPATH, 'reducer':STREAMING_REDUCERPATH})
out_col = self.server.connection()['mongo_hadoop']['yield_historical.out']
self.assertTrue(compare_results(out_col))
Expand Down

0 comments on commit 6370312

Please sign in to comment.