Skip to content

Commit 26fd2aa

Browse files
committed
Add support to get paginated response
Change the search api, so it returns channels, where we push new message on each page Add the -maxPages argument to the command, to limit how many pages should be paginated
1 parent 6ffa74e commit 26fd2aa

File tree

2 files changed

+118
-40
lines changed

2 files changed

+118
-40
lines changed

loggly.go

Lines changed: 58 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ const usage = `
2929
-to <time> ending time [now]
3030
-count print total event count
3131
-all print the entire loggly event instead of just the message
32+
-maxPages <count> maximum number of pages to query [3]
3233
-version print version information
3334
3435
Operators:
@@ -61,6 +62,7 @@ var flags = flag.NewFlagSet("loggly", flag.ExitOnError)
6162
var count = flags.Bool("count", false, "")
6263
var versionQuery = flags.Bool("version", false, "")
6364
var account = flags.String("account", "", "")
65+
var maxPages = flags.Int("maxPages", 3, "")
6466
var token = flags.String("token", "", "")
6567
var size = flags.Int("size", 100, "")
6668
var from = flags.String("from", "-24h", "")
@@ -89,14 +91,15 @@ func check(err error) {
8991
}
9092

9193
func printJSON(events []interface{}) error {
92-
data, err := json.Marshal(events)
94+
for _, event := range events {
95+
data, err := json.Marshal(event)
96+
if err != nil {
97+
return err
98+
}
9399

94-
if err != nil {
95-
return err
100+
fmt.Println(string(data))
96101
}
97102

98-
fmt.Println(string(data))
99-
100103
return nil
101104
}
102105

@@ -116,6 +119,53 @@ func printLogMSG(events []interface{}) error {
116119
return printJSON(ret)
117120
}
118121

122+
func execCount(query string, from string, to string) {
123+
c := search.New(*account, *token)
124+
res, err := c.Query(query).Size(1).From(from).To(to).Fetch()
125+
for {
126+
select {
127+
case r := <-res:
128+
fmt.Println(r.Total)
129+
return
130+
case e := <-err:
131+
check(e)
132+
return
133+
}
134+
}
135+
}
136+
137+
func printRes(res search.Response) {
138+
if *allMsg {
139+
check(printJSON(res.Events))
140+
} else {
141+
if err := printLogMSG(res.Events); err != nil {
142+
fmt.Fprintf(os.Stderr, "Invalid JSON in the 'logmsg' field. Consider to filter the messages, or use the -all flag and parse the message yourself.\n\n%s", err.Error())
143+
}
144+
}
145+
}
146+
147+
func sendQuery(query string, size int, from string, to string, maxPages int) {
148+
doneChan := make(chan error)
149+
150+
c := search.New(*account, *token)
151+
res, err := c.Query(query).Size(size).From(from).To(to).MaxPage(maxPages).Fetch()
152+
153+
go func() {
154+
if e := <-err; e != nil {
155+
doneChan <- e
156+
}
157+
}()
158+
159+
go func() {
160+
for i := range res {
161+
printRes(i)
162+
}
163+
doneChan <- nil
164+
}()
165+
166+
check(<-doneChan)
167+
}
168+
119169
func main() {
120170
flags.Usage = printUsage
121171
flags.Parse(os.Args[1:])
@@ -130,23 +180,11 @@ func main() {
130180

131181
args := flags.Args()
132182
query := strings.Join(args, " ")
133-
c := search.New(*account, *token)
134183

135184
if *count {
136-
res, err := c.Query(query).Size(1).From(*from).To(*to).Fetch()
137-
check(err)
138-
fmt.Println(res.Total)
139-
os.Exit(0)
185+
execCount(query, *from, *to)
186+
return
140187
}
141188

142-
res, err := c.Query(query).Size(*size).From(*from).To(*to).Fetch()
143-
check(err)
144-
145-
if *allMsg {
146-
check(printJSON(res.Events))
147-
} else {
148-
if err := printLogMSG(res.Events); err != nil {
149-
fmt.Fprintf(os.Stderr, "Invalid JSON in the 'logmsg' field. Consider to filter the messages, or use the -all flag and parse the message yourself.\n\n%s", err.Error())
150-
}
151-
}
189+
sendQuery(query, *size, *from, *to, *maxPages)
152190
}

search/search.go

Lines changed: 60 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@ type Response struct {
2828

2929
// Query builder struct
3030
type Query struct {
31-
client *Client
32-
query string
33-
from string
34-
until string
35-
order string
36-
size int
31+
client *Client
32+
query string
33+
from string
34+
until string
35+
order string
36+
size int
37+
maxPages int
3738
}
3839

3940
// Create a new query
@@ -59,14 +60,14 @@ func New(account string, token string) *Client {
5960
return c
6061
}
6162

62-
// Url Return the base api url.
63-
func (c *Client) Url() string {
63+
// URL Return the base api url.
64+
func (c *Client) URL() string {
6465
return fmt.Sprintf("https://%s.%s", c.Account, c.Endpoint)
6566
}
6667

6768
// Get the given path.
6869
func (c *Client) Get(path string) (*http.Response, error) {
69-
r, err := http.NewRequest(http.MethodGet, c.Url()+path, nil)
70+
r, err := http.NewRequest(http.MethodGet, c.URL()+path, nil)
7071
if err != nil {
7172
return nil, err
7273
}
@@ -76,7 +77,7 @@ func (c *Client) Get(path string) (*http.Response, error) {
7677
return client.Do(r)
7778
}
7879

79-
// Get json from the given path.
80+
// GetJSON from the given path.
8081
func (c *Client) GetJSON(path string) (j *simplejson.Json, err error) {
8182
res, err := c.Get(path)
8283

@@ -114,15 +115,10 @@ func (c *Client) GetEvents(params string) (*simplejson.Json, error) {
114115

115116
// Search response with total events, page number
116117
// and the events array.
117-
func (c *Client) Search(params string) (*Response, error) {
118-
j, err := c.CreateSearch(params)
119-
120-
if err != nil {
121-
return nil, err
122-
}
123-
118+
func (c *Client) Search(j *simplejson.Json, page int) (*Response, error) {
124119
id := j.GetPath("rsid", "id").MustString()
125-
j, err = c.GetEvents("rsid=" + id)
120+
121+
j, err := c.GetEvents("rsid=" + id + "&page=" + strconv.Itoa(page))
126122

127123
if err != nil {
128124
return nil, err
@@ -135,6 +131,7 @@ func (c *Client) Search(params string) (*Response, error) {
135131
Page: j.Get("page").MustInt64(),
136132
Events: j.Get("events").MustArray(),
137133
}, nil
134+
138135
}
139136

140137
// Query Create a new search query using the fluent api.
@@ -165,6 +162,12 @@ func (q *Query) From(str string) *Query {
165162
return q
166163
}
167164

165+
// MaxPage sets the max page
166+
func (q *Query) MaxPage(maxPages int) *Query {
167+
q.maxPages = maxPages
168+
return q
169+
}
170+
168171
// Until Set until time.
169172
func (q *Query) Until(str string) *Query {
170173
q.until = str
@@ -179,6 +182,43 @@ func (q *Query) To(str string) *Query {
179182

180183
// Fetch Search response with total events, page number
181184
// and the events array.
182-
func (q *Query) Fetch() (*Response, error) {
183-
return q.client.Search(q.String())
185+
func (q *Query) Fetch() (chan Response, chan error) {
186+
resChan := make(chan Response)
187+
errChan := make(chan error)
188+
189+
page := 0
190+
go func() {
191+
defer close(resChan)
192+
defer close(errChan)
193+
j, err := q.client.CreateSearch(q.String())
194+
195+
if err != nil {
196+
errChan <- err
197+
return
198+
}
199+
200+
for {
201+
res, err := q.client.Search(j, page)
202+
203+
if err != nil {
204+
errChan <- err
205+
return
206+
}
207+
208+
resChan <- *res
209+
210+
if page+1 == q.maxPages {
211+
return
212+
}
213+
214+
if len(res.Events) < q.size {
215+
return
216+
}
217+
218+
page++
219+
}
220+
221+
}()
222+
223+
return resChan, errChan
184224
}

0 commit comments

Comments
 (0)