diff --git a/core/src/main/java/com/mongodb/hadoop/mapred/MongoInputFormat.java b/core/src/main/java/com/mongodb/hadoop/mapred/MongoInputFormat.java index 110fdcb1..377047fe 100644 --- a/core/src/main/java/com/mongodb/hadoop/mapred/MongoInputFormat.java +++ b/core/src/main/java/com/mongodb/hadoop/mapred/MongoInputFormat.java @@ -33,11 +33,11 @@ import com.mongodb.hadoop.io.*; import com.mongodb.BasicDBObject; -public class MongoInputFormat implements InputFormat { +public class MongoInputFormat implements InputFormat { @SuppressWarnings("deprecation") - public RecordReader getRecordReader(InputSplit split, + public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) { if (!(split instanceof MongoInputSplit)) diff --git a/core/src/main/java/com/mongodb/hadoop/mapred/input/MongoRecordReader.java b/core/src/main/java/com/mongodb/hadoop/mapred/input/MongoRecordReader.java index 8a17a70b..465a3543 100644 --- a/core/src/main/java/com/mongodb/hadoop/mapred/input/MongoRecordReader.java +++ b/core/src/main/java/com/mongodb/hadoop/mapred/input/MongoRecordReader.java @@ -26,7 +26,7 @@ @SuppressWarnings( "deprecation" ) -public class MongoRecordReader implements RecordReader { +public class MongoRecordReader implements RecordReader { public MongoRecordReader( MongoInputSplit split ){ _cursor = split.getCursor(); @@ -38,20 +38,20 @@ public void close(){ _cursor.close(); } - public BasicDBObject createKey(){ - return new BasicDBObject(); + public BSONWritable createKey(){ + return new BSONWritable(); } - public BasicDBObject createValue(){ - return new BasicDBObject(); + public BSONWritable createValue(){ + return new BSONWritable(); } - public BasicDBObject getCurrentKey(){ - return new BasicDBObject( "_id", _current.get(_keyField != null ? _keyField : "_id") ); + public BSONWritable getCurrentKey(){ + return new BSONWritable(new BasicDBObject( "_id", _current.get(_keyField != null ? _keyField : "_id") )); } - public BasicDBObject getCurrentValue(){ + public BSONWritable getCurrentValue(){ return _current; } @@ -82,7 +82,7 @@ public boolean nextKeyValue(){ if ( !_cursor.hasNext() ) return false; - _current = (BasicDBObject)_cursor.next(); + _current = new BSONWritable(_cursor.next()); _seen++; return true; @@ -92,7 +92,7 @@ public boolean nextKeyValue(){ } } - public boolean next( BasicDBObject key, BasicDBObject value ){ + public boolean next( BSONWritable key, BSONWritable value ){ if ( nextKeyValue() ){ log.debug( "Had another k/v" ); key.put( "_id", getCurrentKey().get( "_id" ) ); @@ -107,7 +107,7 @@ public boolean next( BasicDBObject key, BasicDBObject value ){ } private final DBCursor _cursor; - private BasicDBObject _current; + private BSONWritable _current; private float _seen = 0; private float _total; private String _keyField; diff --git a/streaming/examples/treasury/mapper.py b/streaming/examples/treasury/mapper.py index f76442f9..4ba8def7 100755 --- a/streaming/examples/treasury/mapper.py +++ b/streaming/examples/treasury/mapper.py @@ -7,6 +7,7 @@ try: from pymongo_hadoop import BSONMapper import pymongo_hadoop + print >> sys.stderr, "pymongo_hadoop is not installed or in path - will try to import from source tree." except: here = os.path.abspath(__file__) module_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(here))), @@ -17,8 +18,11 @@ from pymongo_hadoop import BSONMapper def mapper(documents): + print >> sys.stderr, "Running python mapper." + for doc in documents: yield {'_id': doc['_id'].year, 'bc10Year': doc['bc10Year']} + print >> sys.stderr, "Python mapper finished." + BSONMapper(mapper) -print >> sys.stderr, "Done Mapping." diff --git a/testing/run_treasury.py b/testing/run_treasury.py index f80ac4d3..b2a02784 100644 --- a/testing/run_treasury.py +++ b/testing/run_treasury.py @@ -98,7 +98,7 @@ def compare_results(collection): STREAMING_JARPATH=os.path.join(MONGO_HADOOP_ROOT, "streaming", "target", - "mongo-hadoop-streaming*.jar") + "mongo-hadoop-streaming_cdh4b1-1.1.0-SNAPSHOT.jar") STREAMING_MAPPERPATH=os.path.join(MONGO_HADOOP_ROOT, "streaming", "examples", @@ -130,10 +130,11 @@ def compare_results(collection): } DEFAULT_OLD_PARAMETERS = DEFAULT_PARAMETERS.copy() -DEFAULT_OLD_PARAMETERS["mongo.job.mapper"] = "com.mongodb.hadoop.examples.treasury.TreasuryYieldMapperV2" -DEFAULT_OLD_PARAMETERS["mongo.job.reducer"] = "com.mongodb.hadoop.examples.treasury.TreasuryYieldReducerV2" -DEFAULT_OLD_PARAMETERS["mongo.job.input.format"] = "com.mongodb.hadoop.mapred.MongoInputFormat" -DEFAULT_OLD_PARAMETERS["mongo.job.output.format"] = "com.mongodb.hadoop.mapred.MongoOutputFormat" +DEFAULT_OLD_PARAMETERS.update( + { "mongo.job.mapper": "com.mongodb.hadoop.examples.treasury.TreasuryYieldMapperV2", + "mongo.job.reducer": "com.mongodb.hadoop.examples.treasury.TreasuryYieldReducerV2", + "mongo.job.input.format": "com.mongodb.hadoop.mapred.MongoInputFormat", + "mongo.job.output.format": "com.mongodb.hadoop.mapred.MongoOutputFormat"}) def runjob(hostname, params, input_collection='mongo_hadoop.yield_historical.in', output_collection='mongo_hadoop.yield_historical.out', @@ -165,19 +166,24 @@ def runjob(hostname, params, input_collection='mongo_hadoop.yield_historical.in' subprocess.call(' '.join(cmd), shell=True) def runstreamingjob(hostname, params, input_collection='mongo_hadoop.yield_historical.in', - output_collection='mongo_hadoop.yield_historical.out', readpref="primary"): + output_collection='mongo_hadoop.yield_historical.out', readpref="primary", input_auth=None, output_auth=None): + cmd = [os.path.join(HADOOP_HOME, "bin", "hadoop")] cmd.append("jar") - cmd.append(STREAMING_JARPATH) - - for key, val in params.items(): - cmd.append("-" + key) - cmd.append(val) - - cmd.append("-inputURI") - cmd.append("mongodb://%s/%s?readPreference=%s" % (hostname, input_collection, readpref)) - cmd.append("-outputURI") - cmd.append("mongodb://%s/%s" % (hostname, output_collection)) + cmd.append('$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming*') + cmd += ["-libjars", '$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.0.0-cdh4.2.0.jar,'+STREAMING_JARPATH] + cmd += ["-input", '/tmp/in'] + cmd += ["-output", '/tmp/out'] + cmd += ["-inputformat", 'com.mongodb.hadoop.mapred.MongoInputFormat'] + cmd += ["-outputformat", 'com.mongodb.hadoop.mapred.MongoOutputFormat'] + cmd += ["-io", 'mongodb'] + input_uri = 'mongodb://%s%s/%s?readPreference=%s' % (input_auth + "@" if input_auth else '', hostname, input_collection, readpref) + cmd += ['-jobconf', "mongo.input.uri=%s" % input_uri] + output_uri = "mongo.output.uri=mongodb://%s%s/%s" % (output_auth + "@" if output_auth else '', hostname, output_collection) + cmd += ['-jobconf', output_uri] + cmd += ['-jobconf', 'stream.io.identifier.resolver.class=com.mongodb.hadoop.streaming.io.MongoIdentifierResolver'] + cmd += ['-mapper', params['mapper']] + cmd += ['-reducer', params['reducer']] print cmd subprocess.call(' '.join(cmd), shell=True) @@ -396,6 +402,7 @@ class TestStreaming(Standalone): @unittest.skipIf(HADOOP_RELEASE.startswith('1.0') or HADOOP_RELEASE.startswith('0.20'), 'streaming not supported') def test_treasury(self): + PARAMETERS = DEFAULT_OLD_PARAMETERS.copy() runstreamingjob(self.server_hostname, {'mapper': STREAMING_MAPPERPATH, 'reducer':STREAMING_REDUCERPATH}) out_col = self.server.connection()['mongo_hadoop']['yield_historical.out'] self.assertTrue(compare_results(out_col))