From 6708df80cb8dea7f7adc14b300be156019e87203 Mon Sep 17 00:00:00 2001 From: Darren O'Connor Date: Wed, 10 Nov 2021 00:55:14 +0000 Subject: [PATCH 1/3] experimenting --- rpsl-parser/examples/testdata/test.db | 55 +++++++++++++++++++++ rpsl-tools/main.go | 70 ++++++++++++--------------- 2 files changed, 86 insertions(+), 39 deletions(-) create mode 100644 rpsl-parser/examples/testdata/test.db diff --git a/rpsl-parser/examples/testdata/test.db b/rpsl-parser/examples/testdata/test.db new file mode 100644 index 0000000..d6f38c6 --- /dev/null +++ b/rpsl-parser/examples/testdata/test.db @@ -0,0 +1,55 @@ +aut-num: AS10279 +as-name: WCCOM-AS +descr: WEST CAROLINA COMM, LLC +descr: 229 Hwy 28 By-Pass +descr: Abbeville, SC 29620 +descr: US +admin-c: WCTEL1-ARIN +tech-c: TWN1-ARIN +mnt-by: MNT-WCCL-3 +changed: tom.nickles@wctel.com 20191217 +source: ARIN + +aut-num: AS10302 +as-name: NEGIA-AS +descr: Snappy Internet & Telecom +descr: Miami, Florida +member-of: AS-NEGIA-BACKBONE +tech-c: FI14-ARIN +admin-c: FI14-ARIN +mnt-by: MNT-COFS +notify: faisal@snappytelecom.net +changed: faisal@snappytelecom.net 20110920 +source: ARIN + +aut-num: AS10352 +as-name: SOLARUS-AS +descr: Solarus + 440 E Grand Avenue + Wisconsin Rapids WI 54494 + United States +mp-export: afi any.unicast to AS-ANY announce AS-10352ALL +admin-c: NEO4-ARIN +tech-c: JSL85-ARIN +tech-c: NEO4-ARIN +mnt-by: MNT-WCTC +created: 2021-07-08T11:59:46Z +last-modified: 2021-07-08T11:59:46Z +source: ARIN + +aut-num: AS10381 +as-name: SWCP-AS +descr: Southwest Cyberport, Inc + 5021 Indian School NE + Suite 600 + Albuquerque NM 87110 + United States +admin-c: JKC3-ARIN +tech-c: COSTL2-ARIN +tech-c: JKC3-ARIN +tech-c: MC143-ARIN +tech-c: NOS19-ARIN +mnt-by: MNT-SWCP +created: 2020-07-07T21:45:24Z +last-modified: 2021-10-07T17:53:59Z +source: ARIN \ No newline at end of file diff --git a/rpsl-tools/main.go b/rpsl-tools/main.go index 2ac6f2e..73d555f 100644 --- a/rpsl-tools/main.go +++ b/rpsl-tools/main.go @@ -28,7 +28,7 @@ import ( "strings" "github.com/golang/glog" - rpsl "github.com/manrs-tools/contrib/rpsl-parser" + "github.com/manrs-tools/contrib/rpsl-parser" rppb "github.com/manrs-tools/contrib/rpsl-parser/proto" ) @@ -71,11 +71,18 @@ func getReader(fn string) (io.Reader, error) { return fd, nil } +func prepWorker(ic <-chan string) <-chan *rppb.Record { + ch := make(chan *rppb.Record, 100) + go parseFile(ic, ch) + return ch +} + // parseFile reads and parses files as their filenames arrive on // the input channel. A parse error will abort processing for the // corresponding file and move to the next one -func parseFile(ic <-chan string, rc chan<- *rppb.Record, ec chan<- bool) { +func parseFile(ic <-chan string, ch chan *rppb.Record) { for fn := range ic { + //files++ var rdr *rpsl.Reader reader, err := getReader(fn) @@ -110,14 +117,19 @@ func parseFile(ic <-chan string, rc chan<- *rppb.Record, ec chan<- bool) { } // Parse the file, sending results back up the channel (rc). - rpsl.Parse(rdr, rc) + rpsl.Parse(rdr, ch) + } + + close(ch) +} - // When done parsing, send a file-done message on the end channel (ec). - ec <- true +func displayRecords(rc <-chan *rppb.Record) { + for v := range rc { + fmt.Printf("Record returned: %v\n", v) } } -// Verify that there files requested exist, open each in a goroutine and feed +// Verify that the files requested exist, open each in a goroutine and feed // those to the rpsl-parser library, returning each record to a channel for // disposition in the final data structure to be loaded into a DB. func main() { @@ -130,19 +142,8 @@ func main() { return } - // Two buffered channels, one for input and for the resulting records. ic := make(chan string, len(rpslFiles)) - rc := make(chan *rppb.Record, 100) - - // To signal that all files are done processing, - // use ec (end channel) to pass state of 'done with file X' - // to the Record processor. - ec := make(chan bool, len(rpslFiles)) - - // Start the parsing/worker thread - for i := 0; i < *threads; i++ { - go parseFile(ic, rc, ec) - } + var allRecords []<-chan *rppb.Record // Push each file into the input (ic) channel. for _, fn := range rpslFiles { @@ -150,27 +151,18 @@ func main() { } close(ic) - // Track the number of files completed. - files := 0 - // Read records from the channel, - // TODO(morrowc): I'm positive this is supposed to be simpler with - // a sync.WaitGroup. Investigate that later. -Loop: - for { - select { - case r := <-rc: - fmt.Printf("Record returned: %v\n", r) - if files == len(rpslFiles) && len(rc) == 0 { - break Loop - } - case <-ec: - files++ - if files == len(rpslFiles) && len(rc) == 0 { - break Loop - } - default: - } + // Start the parsing/worker thread + // TODO: if len(files) < threads, just use len(files) + for i := 0; i < *threads; i++ { + records := prepWorker(ic) + allRecords = append(allRecords, records) + } + fmt.Println("about to wait") + fmt.Println("finished waiting") + + for _, v := range allRecords { + displayRecords(v) } - close(rc) + //fmt.Printf("Processed with %d records", len(rc)) } From d7f8d09dc532e28545a424673192320d834effdd Mon Sep 17 00:00:00 2001 From: Darren O'Connor Date: Sun, 14 Nov 2021 01:07:44 +0000 Subject: [PATCH 2/3] Fix concurrency --- .gitignore | 5 ++ go.mod | 1 + go.sum | 2 + rpsl-parser/proto/rpsl.pb.go | 102 +++++++++++++++++++-------- rpsl-tools/main.go | 131 ++++++++++++++++++----------------- 5 files changed, 146 insertions(+), 95 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..72b490a --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +rpsl-parser/proto/rpsl.pb.go +rpsl-tools/data/rad2.db +rpsl-tools/data/rad3.db +rpsl-tools/data/radb.db +rpsl-parser/proto/rpsl.pb.go diff --git a/go.mod b/go.mod index 6c4c9b7..d2d7a85 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/golang/protobuf v1.5.2 github.com/google/go-cmp v0.5.6 github.com/manrs-tools/contrib v0.0.0-20190703155503-b9ddd2b08592 + github.com/pkg/profile v1.6.0 ) require google.golang.org/protobuf v1.27.1 // indirect diff --git a/go.sum b/go.sum index df77850..817c167 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,8 @@ github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/manrs-tools/contrib v0.0.0-20190703155503-b9ddd2b08592 h1:QdJoifdAPuKychiugiNVpKPCR00BGWyvcsNF2sGwjnE= github.com/manrs-tools/contrib v0.0.0-20190703155503-b9ddd2b08592/go.mod h1:TWtXtOwANTt3GOnfY4RfYy5Sdju84mVEKu0qBi5X7gQ= +github.com/pkg/profile v1.6.0 h1:hUDfIISABYI59DyeB3OTay/HxSRwTQ8rB/H83k6r5dM= +github.com/pkg/profile v1.6.0/go.mod h1:qBsxPvzyUincmltOk6iyRVxHYg4adc0OFOv72ZdLa18= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= diff --git a/rpsl-parser/proto/rpsl.pb.go b/rpsl-parser/proto/rpsl.pb.go index 551bb9a..0924569 100644 --- a/rpsl-parser/proto/rpsl.pb.go +++ b/rpsl-parser/proto/rpsl.pb.go @@ -1,21 +1,13 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // source: rpsl.proto -/* -Package rpsl_proto is a generated protocol buffer package. - -It is generated from these files: - rpsl.proto - -It has these top-level messages: - KeyValue - Record -*/ package rpsl_proto -import proto "github.com/golang/protobuf/proto" -import fmt "fmt" -import math "math" +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal @@ -26,7 +18,7 @@ var _ = math.Inf // is compatible with the proto package it is being compiled against. // A compilation error at this line likely means your copy of the // proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package // Type is the ENUM of various record types that are possible // in standard RPSL. @@ -201,6 +193,7 @@ var Type_name = map[int32]string{ 95: "XXTE6", 96: "XXTESET", } + var Type_value = map[string]int32{ "UNKNOWN": 0, "EOF": 1, @@ -289,17 +282,43 @@ var Type_value = map[string]int32{ func (x Type) String() string { return proto.EnumName(Type_name, int32(x)) } -func (Type) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (Type) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_cf10760c6fc5b93a, []int{0} +} type KeyValue struct { - Key Type `protobuf:"varint,1,opt,name=key,enum=rpsl.proto.Type" json:"key,omitempty"` - Value string `protobuf:"bytes,2,opt,name=value" json:"value,omitempty"` + Key Type `protobuf:"varint,1,opt,name=key,proto3,enum=rpsl.proto.Type" json:"key,omitempty"` + Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *KeyValue) Reset() { *m = KeyValue{} } +func (m *KeyValue) String() string { return proto.CompactTextString(m) } +func (*KeyValue) ProtoMessage() {} +func (*KeyValue) Descriptor() ([]byte, []int) { + return fileDescriptor_cf10760c6fc5b93a, []int{0} +} + +func (m *KeyValue) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_KeyValue.Unmarshal(m, b) +} +func (m *KeyValue) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_KeyValue.Marshal(b, m, deterministic) +} +func (m *KeyValue) XXX_Merge(src proto.Message) { + xxx_messageInfo_KeyValue.Merge(m, src) +} +func (m *KeyValue) XXX_Size() int { + return xxx_messageInfo_KeyValue.Size(m) +} +func (m *KeyValue) XXX_DiscardUnknown() { + xxx_messageInfo_KeyValue.DiscardUnknown(m) } -func (m *KeyValue) Reset() { *m = KeyValue{} } -func (m *KeyValue) String() string { return proto.CompactTextString(m) } -func (*KeyValue) ProtoMessage() {} -func (*KeyValue) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } +var xxx_messageInfo_KeyValue proto.InternalMessageInfo func (m *KeyValue) GetKey() Type { if m != nil { @@ -318,14 +337,37 @@ func (m *KeyValue) GetValue() string { // // Record, a single rpsl record. type Record struct { - Type Type `protobuf:"varint,1,opt,name=type,enum=rpsl.proto.Type" json:"type,omitempty"` - Fields []*KeyValue `protobuf:"bytes,2,rep,name=fields" json:"fields,omitempty"` + Type Type `protobuf:"varint,1,opt,name=type,proto3,enum=rpsl.proto.Type" json:"type,omitempty"` + Fields []*KeyValue `protobuf:"bytes,2,rep,name=fields,proto3" json:"fields,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Record) Reset() { *m = Record{} } +func (m *Record) String() string { return proto.CompactTextString(m) } +func (*Record) ProtoMessage() {} +func (*Record) Descriptor() ([]byte, []int) { + return fileDescriptor_cf10760c6fc5b93a, []int{1} +} + +func (m *Record) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Record.Unmarshal(m, b) +} +func (m *Record) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Record.Marshal(b, m, deterministic) +} +func (m *Record) XXX_Merge(src proto.Message) { + xxx_messageInfo_Record.Merge(m, src) +} +func (m *Record) XXX_Size() int { + return xxx_messageInfo_Record.Size(m) +} +func (m *Record) XXX_DiscardUnknown() { + xxx_messageInfo_Record.DiscardUnknown(m) } -func (m *Record) Reset() { *m = Record{} } -func (m *Record) String() string { return proto.CompactTextString(m) } -func (*Record) ProtoMessage() {} -func (*Record) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } +var xxx_messageInfo_Record proto.InternalMessageInfo func (m *Record) GetType() Type { if m != nil { @@ -342,14 +384,14 @@ func (m *Record) GetFields() []*KeyValue { } func init() { + proto.RegisterEnum("rpsl.proto.Type", Type_name, Type_value) proto.RegisterType((*KeyValue)(nil), "rpsl.proto.KeyValue") proto.RegisterType((*Record)(nil), "rpsl.proto.Record") - proto.RegisterEnum("rpsl.proto.Type", Type_name, Type_value) } -func init() { proto.RegisterFile("rpsl.proto", fileDescriptor0) } +func init() { proto.RegisterFile("rpsl.proto", fileDescriptor_cf10760c6fc5b93a) } -var fileDescriptor0 = []byte{ +var fileDescriptor_cf10760c6fc5b93a = []byte{ // 732 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x54, 0x6b, 0x73, 0xdc, 0x44, 0x10, 0xc4, 0xb1, 0xe3, 0xd8, 0x6b, 0x08, 0x83, 0x30, 0x60, 0xde, 0xc6, 0x84, 0xc2, 0x84, 0x70, diff --git a/rpsl-tools/main.go b/rpsl-tools/main.go index 73d555f..7f182fa 100644 --- a/rpsl-tools/main.go +++ b/rpsl-tools/main.go @@ -26,6 +26,7 @@ import ( "io" "os" "strings" + "sync" "github.com/golang/glog" "github.com/manrs-tools/contrib/rpsl-parser" @@ -34,6 +35,7 @@ import ( var ( threads = flag.Int("threads", 4, "Max threads to use in parsing db files.") + display = flag.Bool("display", false, "Display decoded records on screen") ) // Define a slice of filenames to get as a single flag on startup. @@ -71,68 +73,62 @@ func getReader(fn string) (io.Reader, error) { return fd, nil } -func prepWorker(ic <-chan string) <-chan *rppb.Record { - ch := make(chan *rppb.Record, 100) - go parseFile(ic, ch) - return ch -} - // parseFile reads and parses files as their filenames arrive on // the input channel. A parse error will abort processing for the // corresponding file and move to the next one -func parseFile(ic <-chan string, ch chan *rppb.Record) { - for fn := range ic { - //files++ - var rdr *rpsl.Reader +func parseFile(fn string, sem chan int, rc chan *rppb.Record, wg *sync.WaitGroup) { + defer wg.Done() + sem <- 1 - reader, err := getReader(fn) - if err != nil { - glog.Infof("Failed to read file(%v): %v\n", fn, err) - continue - } - rdr = rpsl.NewReader(reader) + var rdr *rpsl.Reader - // Read all leading comments and whitespace. - err = rdr.ConsumeComment() - if err != nil { - glog.Infof("Failed reading file(%v): %v\n", fn, err) - continue - } - err = rdr.ConsumeLeadingWS() - if err != nil { - glog.Infof("Failed reading file(%v): %v\n", fn, err) - continue - } - - // The file must start with a letter, all IRR records start with a letter character. - r := rdr.Peek() - if !rpsl.IsLetter(r) { - glog.Infof("The first character read(%v) is not a letter, file unparsable.\n", string(r)) - // Add 2 more chars so finding the problem is more possible. - r, _, _ := rdr.Read() - glog.Infof("Next char: %v\n", string(r)) - r, _, _ = rdr.Read() - glog.Infof("Next char: %v\n", string(r)) - continue - } - - // Parse the file, sending results back up the channel (rc). - rpsl.Parse(rdr, ch) + reader, err := getReader(fn) + if err != nil { + glog.Infof("Failed to read file(%v): %v\n", fn, err) + <-sem + return } + rdr = rpsl.NewReader(reader) - close(ch) -} + // Read all leading comments and whitespace. + err = rdr.ConsumeComment() + if err != nil { + glog.Infof("Failed reading file(%v): %v\n", fn, err) + <-sem + return + } + err = rdr.ConsumeLeadingWS() + if err != nil { + glog.Infof("Failed reading file(%v): %v\n", fn, err) + <-sem + return + } -func displayRecords(rc <-chan *rppb.Record) { - for v := range rc { - fmt.Printf("Record returned: %v\n", v) + // The file must start with a letter, all IRR records start with a letter character. + r := rdr.Peek() + if !rpsl.IsLetter(r) { + glog.Infof("The first character read(%v) is not a letter, file unparsable.\n", string(r)) + // Add 2 more chars so finding the problem is more possible. + r, _, _ := rdr.Read() + glog.Infof("Next char: %v\n", string(r)) + r, _, _ = rdr.Read() + glog.Infof("Next char: %v\n", string(r)) + <-sem + return } + + // Parse the file, sending results back up the channel (rc). + rpsl.Parse(rdr, rc) + <-sem } // Verify that the files requested exist, open each in a goroutine and feed // those to the rpsl-parser library, returning each record to a channel for // disposition in the final data structure to be loaded into a DB. func main() { + //defer profile.Start(profile.CPUProfile, profile.ProfilePath(".")).Stop() + //defer profile.Start(profile.MemProfile, profile.ProfilePath(".")).Stop() + flag.Var(&rpslFiles, "rpslFiles", "Files to parse, irr/rpsl content, filenames as csv.") flag.Parse() @@ -141,28 +137,33 @@ func main() { flag.PrintDefaults() return } + numFiles := len(rpslFiles) + // If the amount of file is less than threads, only spin up enough threads for those files + if numFiles < *threads { + *threads = numFiles + } - ic := make(chan string, len(rpslFiles)) - var allRecords []<-chan *rppb.Record + sem := make(chan int, *threads) + var wg sync.WaitGroup + wg.Add(numFiles) - // Push each file into the input (ic) channel. - for _, fn := range rpslFiles { - ic <- fn - } - close(ic) + rc := make(chan *rppb.Record) - // Start the parsing/worker thread - // TODO: if len(files) < threads, just use len(files) - for i := 0; i < *threads; i++ { - records := prepWorker(ic) - allRecords = append(allRecords, records) - } - fmt.Println("about to wait") - fmt.Println("finished waiting") + var records []*rppb.Record + go func(ch <-chan *rppb.Record) { + for v := range ch { + if *display { + fmt.Printf("Received record: %v\n", v) + } + records = append(records, v) + } + }(rc) - for _, v := range allRecords { - displayRecords(v) + for _, fn := range rpslFiles { + go parseFile(fn, sem, rc, &wg) } + wg.Wait() + close(rc) - //fmt.Printf("Processed with %d records", len(rc)) + fmt.Printf("Received a total of %d records\n", len(records)) } From 32c7fe27c5692abba83ea3176c6a9cafbccf89f3 Mon Sep 17 00:00:00 2001 From: Darren O'Connor Date: Mon, 15 Nov 2021 15:09:21 +0000 Subject: [PATCH 3/3] Move away from receive channel --- rpsl-parser/irr.go | 20 ++++++++++++++++---- rpsl-tools/main.go | 29 +++++++++-------------------- 2 files changed, 25 insertions(+), 24 deletions(-) diff --git a/rpsl-parser/irr.go b/rpsl-parser/irr.go index 8bb1ada..08fdb3e 100644 --- a/rpsl-parser/irr.go +++ b/rpsl-parser/irr.go @@ -24,12 +24,24 @@ import ( "fmt" "io" "strings" + "sync" "unicode" "github.com/golang/glog" rppb "github.com/manrs-tools/contrib/rpsl-parser/proto" ) +type Records struct { + Records []*rppb.Record + mu sync.Mutex +} + +func (r *Records) AddRecord(record *rppb.Record) { + r.mu.Lock() + defer r.mu.Unlock() + r.Records = append(r.Records, record) +} + // Reader is a struct to manage access to the irr database file. type Reader struct { reader *bufio.Reader @@ -328,7 +340,7 @@ func (r *Reader) initRecord() (*rppb.Record, error) { } // Parse parses through the content sending resulting records into a channel to the caller. -func Parse(rdr *Reader, rc chan<- *rppb.Record) { +func Parse(rdr *Reader, records *Records) { // Read the file content, return all accumulated records. // Return in case of parsing errors on record/keyword type. // Return in case of reading error. @@ -348,7 +360,7 @@ func Parse(rdr *Reader, rc chan<- *rppb.Record) { err := rdr.consumeColon() if err != nil { - rc <- rec + records.AddRecord(rec) glog.Infof("failed to consume a key's colon separator: %v", err) break } @@ -356,7 +368,7 @@ func Parse(rdr *Reader, rc chan<- *rppb.Record) { val, re, err := rdr.readValue() if err != nil { addKV(rec, key, val) - rc <- rec + records.AddRecord(rec) if err == io.EOF { // EOF in a read means moving to the next file. glog.Infof("found an EOF while reading a value: %v", err) @@ -373,6 +385,6 @@ func Parse(rdr *Reader, rc chan<- *rppb.Record) { break } } - rc <- rec + records.AddRecord(rec) } } diff --git a/rpsl-tools/main.go b/rpsl-tools/main.go index 7f182fa..6f7be12 100644 --- a/rpsl-tools/main.go +++ b/rpsl-tools/main.go @@ -27,10 +27,10 @@ import ( "os" "strings" "sync" + "unicode" "github.com/golang/glog" "github.com/manrs-tools/contrib/rpsl-parser" - rppb "github.com/manrs-tools/contrib/rpsl-parser/proto" ) var ( @@ -76,7 +76,7 @@ func getReader(fn string) (io.Reader, error) { // parseFile reads and parses files as their filenames arrive on // the input channel. A parse error will abort processing for the // corresponding file and move to the next one -func parseFile(fn string, sem chan int, rc chan *rppb.Record, wg *sync.WaitGroup) { +func parseFile(fn string, sem chan int, records *rpsl.Records, wg *sync.WaitGroup) { defer wg.Done() sem <- 1 @@ -106,7 +106,7 @@ func parseFile(fn string, sem chan int, rc chan *rppb.Record, wg *sync.WaitGroup // The file must start with a letter, all IRR records start with a letter character. r := rdr.Peek() - if !rpsl.IsLetter(r) { + if !unicode.IsLetter(r) { glog.Infof("The first character read(%v) is not a letter, file unparsable.\n", string(r)) // Add 2 more chars so finding the problem is more possible. r, _, _ := rdr.Read() @@ -118,7 +118,7 @@ func parseFile(fn string, sem chan int, rc chan *rppb.Record, wg *sync.WaitGroup } // Parse the file, sending results back up the channel (rc). - rpsl.Parse(rdr, rc) + rpsl.Parse(rdr, records) <-sem } @@ -128,6 +128,7 @@ func parseFile(fn string, sem chan int, rc chan *rppb.Record, wg *sync.WaitGroup func main() { //defer profile.Start(profile.CPUProfile, profile.ProfilePath(".")).Stop() //defer profile.Start(profile.MemProfile, profile.ProfilePath(".")).Stop() + //defer profile.Start(profile.TraceProfile, profile.ProfilePath(".")).Stop() flag.Var(&rpslFiles, "rpslFiles", "Files to parse, irr/rpsl content, filenames as csv.") flag.Parse() @@ -143,27 +144,15 @@ func main() { *threads = numFiles } + var records rpsl.Records + sem := make(chan int, *threads) var wg sync.WaitGroup wg.Add(numFiles) - - rc := make(chan *rppb.Record) - - var records []*rppb.Record - go func(ch <-chan *rppb.Record) { - for v := range ch { - if *display { - fmt.Printf("Received record: %v\n", v) - } - records = append(records, v) - } - }(rc) - for _, fn := range rpslFiles { - go parseFile(fn, sem, rc, &wg) + go parseFile(fn, sem, &records, &wg) } wg.Wait() - close(rc) - fmt.Printf("Received a total of %d records\n", len(records)) + fmt.Printf("Received a total of %d records\n", len(records.Records)) }