Skip to content

Commit c9a891c

Browse files
Merge pull request #398 from gliderlabs/master
release 3.2.5
2 parents acfb302 + 3597569 commit c9a891c

File tree

8 files changed

+520
-5
lines changed

8 files changed

+520
-5
lines changed

CHANGELOG.md

+7-2
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@ All notable changes to this project will be documented in this file.
1010

1111
### Changed
1212

13-
## [v3.2.4] - 2018-01-16
13+
## [v3.2.5] - 2018-06-05
14+
- @gmelika panic if reconnect fails
15+
- @masterada Added multiline adapter
16+
- @billimek sleeping and syncing to fix issues with docker hub builds
17+
1418
### Fixed
1519
- @michaelshobbs fix working_directory so we don't duplicate test runs
1620

@@ -178,7 +182,8 @@ All notable changes to this project will be documented in this file.
178182
- Base container is now Alpine
179183
- Moved to gliderlabs organization
180184

181-
[unreleased]: https://github.com/gliderlabs/logspout/compare/v3.2.4...HEAD
185+
[unreleased]: https://github.com/gliderlabs/logspout/compare/v3.2.5...HEAD
186+
[v3.2.5]: https://github.com/gliderlabs/logspout/compare/v3.2.4...v3.2.5
182187
[v3.2.4]: https://github.com/gliderlabs/logspout/compare/v3.2.3...v3.2.4
183188
[v3.2.3]: https://github.com/gliderlabs/logspout/compare/v3.2.2...v3.2.3
184189
[v3.2.2]: https://github.com/gliderlabs/logspout/compare/v3.2.1...v3.2.2

Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@ RUN cd /src && ./build.sh "$(cat VERSION)"
88

99
ONBUILD COPY ./build.sh /src/build.sh
1010
ONBUILD COPY ./modules.go /src/modules.go
11-
ONBUILD RUN cd /src && chmod +x ./build.sh && ./build.sh "$(cat VERSION)-custom"
11+
ONBUILD RUN cd /src && chmod +x ./build.sh && sleep 1 && sync && ./build.sh "$(cat VERSION)-custom"

README.md

+31
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,32 @@ See [routesapi module](http://github.com/gliderlabs/logspout/blob/master/routesa
135135

136136
Logspout relies on the Docker API to retrieve container logs. A failure in the API may cause a log stream to hang. Logspout can detect and restart inactive Docker log streams. Use the environment variable `INACTIVITY_TIMEOUT` to enable this feature. E.g.: `INACTIVITY_TIMEOUT=1m` for a 1-minute threshold.
137137

138+
#### Multiline logging
139+
140+
In order to enable multiline logging, you must first prefix your adapter with the multiline adapter:
141+
142+
$ docker run \
143+
--volume=/var/run/docker.sock:/var/run/docker.sock \
144+
gliderlabs/logspout \
145+
multiline+raw://192.168.10.10:5000?filter.name=*_db
146+
147+
Using the the above prefix enables multiline logging on all containers by default. To enable it only to specific containers set MULTILINE_ENABLE_DEFAULT=false for logspout, and use the LOGSPOUT_MULTILINE environment variable on the monitored container:
148+
149+
$ docker run -d -e 'LOGSPOUT_MULTILINE=true' image
150+
151+
##### MULTILINE_MATCH
152+
153+
Using the environment variable `MULTILINE_MATCH`=<first|last|nonfirst|nonlast> (default `nonfirst`) you define, which lines should be matched to the `MULTILINE_PATTERN`.
154+
* first: match first line only and append following messages until you match another line
155+
* last: concatenate all messages until the pattern matches the next line
156+
* nonlast: match a line, append upcoming matching lines, also append first non-matching line and start
157+
* nonfirst: append all matching lines to first line and start over with the next non-matching line
158+
159+
##### Important!
160+
If you use multiline logging with raw, it's recommended to json encode the Data to avoid linebreaks in the output, eg:
161+
162+
"RAW_FORMAT={{ toJSON .Data }}\n"
163+
138164
#### Environment variables
139165

140166
* `ALLOW_TTY` - include logs from containers started with `-t` or `--tty` (i.e. `Allocate a pseudo-TTY`)
@@ -155,6 +181,11 @@ Logspout relies on the Docker API to retrieve container logs. A failure in the A
155181
* `SYSLOG_STRUCTURED_DATA` - datum for structured data field
156182
* `SYSLOG_TAG` - datum for tag field (default `{{.ContainerName}}+route.Options["append_tag"]`)
157183
* `SYSLOG_TIMESTAMP` - datum for timestamp field (default `{{.Timestamp}}`)
184+
* `MULTILINE_ENABLE_DEFAULT` - enable multiline logging for all containers when using the multiline adapter (default `true`)
185+
* `MULTILINE_MATCH` - determines which lines the pattern should match, one of first|last|nonfirst|nonlast, for details see: [MULTILINE_MATCH](#multiline_match) (default `nonfirst`)
186+
* `MULTILINE_PATTERN` - pattern for multiline logging, see: [MULTILINE_MATCH](#multiline_match) (default: `^\s`)
187+
* `MULTILINE_FLUSH_AFTER` - maximum time between the first and last lines of a multiline log entry in milliseconds (default: 500)
188+
* `MULTILINE_SEPARATOR` - separator between lines for output (default: `\n`)
158189

159190
#### Raw Format
160191

VERSION

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
v3.2.4
1+
v3.2.5

adapters/multiline/multiline.go

+247
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
package multiline
2+
3+
import (
4+
"errors"
5+
"os"
6+
"regexp"
7+
"strconv"
8+
"strings"
9+
"sync"
10+
"time"
11+
12+
"github.com/fsouza/go-dockerclient"
13+
"github.com/gliderlabs/logspout/router"
14+
)
15+
16+
const (
17+
matchFirst = "first"
18+
matchLast = "last"
19+
matchNonFirst = "nonfirst"
20+
matchNonLast = "nonlast"
21+
)
22+
23+
func init() {
24+
router.AdapterFactories.Register(NewMultilineAdapter, "multiline")
25+
}
26+
27+
// Adapter collects multi-lint log entries and sends them to the next adapter as a single entry
28+
type Adapter struct {
29+
out chan *router.Message
30+
subAdapter router.LogAdapter
31+
enableByDefault bool
32+
pattern *regexp.Regexp
33+
separator string
34+
matchFirstLine bool
35+
negateMatch bool
36+
flushAfter time.Duration
37+
checkInterval time.Duration
38+
buffers map[string]*router.Message
39+
nextCheck <-chan time.Time
40+
}
41+
42+
// NewMultilineAdapter returns a configured multiline.Adapter
43+
func NewMultilineAdapter(route *router.Route) (a router.LogAdapter, err error) {
44+
enableByDefault := true
45+
enableStr := os.Getenv("MULTILINE_ENABLE_DEFAULT")
46+
if enableStr != "" {
47+
var err error
48+
enableByDefault, err = strconv.ParseBool(enableStr)
49+
if err != nil {
50+
return nil, errors.New("multiline: invalid value for MULTILINE_ENABLE_DEFAULT (must be true|false): " + enableStr)
51+
}
52+
}
53+
54+
pattern := os.Getenv("MULTILINE_PATTERN")
55+
if pattern == "" {
56+
pattern = `^\s`
57+
}
58+
59+
separator := os.Getenv("MULTILINE_SEPARATOR")
60+
if separator == "" {
61+
separator = "\n"
62+
}
63+
patternRegexp, err := regexp.Compile(pattern)
64+
if err != nil {
65+
return nil, errors.New("multiline: invalid value for MULTILINE_PATTERN (must be regexp): " + pattern)
66+
}
67+
68+
matchType := os.Getenv("MULTILINE_MATCH")
69+
if matchType == "" {
70+
matchType = matchNonFirst
71+
}
72+
matchType = strings.ToLower(matchType)
73+
matchFirstLine := false
74+
negateMatch := false
75+
switch matchType {
76+
case matchFirst:
77+
matchFirstLine = true
78+
negateMatch = false
79+
case matchLast:
80+
matchFirstLine = false
81+
negateMatch = false
82+
case matchNonFirst:
83+
matchFirstLine = true
84+
negateMatch = true
85+
case matchNonLast:
86+
matchFirstLine = false
87+
negateMatch = true
88+
default:
89+
return nil, errors.New("multiline: invalid value for MULTILINE_MATCH (must be one of first|last|nonfirst|nonlast): " + matchType)
90+
}
91+
92+
flushAfter := 500 * time.Millisecond
93+
flushAfterStr := os.Getenv("MULTILINE_FLUSH_AFTER")
94+
if flushAfterStr != "" {
95+
timeoutMS, err := strconv.Atoi(flushAfterStr)
96+
if err != nil {
97+
return nil, errors.New("multiline: invalid value for multiline_timeout (must be number): " + flushAfterStr)
98+
}
99+
flushAfter = time.Duration(timeoutMS) * time.Millisecond
100+
}
101+
102+
parts := strings.SplitN(route.Adapter, "+", 2)
103+
if len(parts) != 2 {
104+
return nil, errors.New("multiline: adapter must have a sub-adapter, eg: multiline+raw+tcp")
105+
}
106+
107+
originalAdapter := route.Adapter
108+
route.Adapter = parts[1]
109+
factory, found := router.AdapterFactories.Lookup(route.AdapterType())
110+
if !found {
111+
return nil, errors.New("bad adapter: " + originalAdapter)
112+
}
113+
subAdapter, err := factory(route)
114+
if err != nil {
115+
return nil, err
116+
}
117+
route.Adapter = originalAdapter
118+
119+
out := make(chan *router.Message)
120+
checkInterval := flushAfter / 2
121+
122+
return &Adapter{
123+
out: out,
124+
subAdapter: subAdapter,
125+
enableByDefault: enableByDefault,
126+
pattern: patternRegexp,
127+
separator: separator,
128+
matchFirstLine: matchFirstLine,
129+
negateMatch: negateMatch,
130+
flushAfter: flushAfter,
131+
checkInterval: checkInterval,
132+
buffers: make(map[string]*router.Message),
133+
nextCheck: time.After(checkInterval),
134+
}, nil
135+
}
136+
137+
// Stream sends log data to the next adapter
138+
func (a *Adapter) Stream(logstream chan *router.Message) {
139+
wg := sync.WaitGroup{}
140+
wg.Add(1)
141+
go func() {
142+
a.subAdapter.Stream(a.out)
143+
wg.Done()
144+
}()
145+
defer func() {
146+
for _, message := range a.buffers {
147+
a.out <- message
148+
}
149+
150+
close(a.out)
151+
wg.Wait()
152+
}()
153+
154+
for {
155+
select {
156+
case message, ok := <-logstream:
157+
if !ok {
158+
return
159+
}
160+
161+
if !multilineContainer(message.Container, a.enableByDefault) {
162+
a.out <- message
163+
continue
164+
}
165+
166+
cID := message.Container.ID
167+
old, oldExists := a.buffers[cID]
168+
if a.isFirstLine(message) {
169+
if oldExists {
170+
a.out <- old
171+
}
172+
173+
a.buffers[cID] = message
174+
} else {
175+
isLastLine := a.isLastLine(message)
176+
177+
if oldExists {
178+
old.Data += a.separator + message.Data
179+
message = old
180+
}
181+
182+
if isLastLine {
183+
a.out <- message
184+
if oldExists {
185+
delete(a.buffers, cID)
186+
}
187+
} else {
188+
a.buffers[cID] = message
189+
}
190+
}
191+
case <-a.nextCheck:
192+
now := time.Now()
193+
194+
for key, message := range a.buffers {
195+
if message.Time.Add(a.flushAfter).After(now) {
196+
a.out <- message
197+
delete(a.buffers, key)
198+
}
199+
}
200+
201+
a.nextCheck = time.After(a.checkInterval)
202+
}
203+
}
204+
}
205+
206+
func (a *Adapter) isFirstLine(message *router.Message) bool {
207+
if !a.matchFirstLine {
208+
return false
209+
}
210+
211+
match := a.pattern.MatchString(message.Data)
212+
if a.negateMatch {
213+
return !match
214+
}
215+
216+
return match
217+
}
218+
219+
func (a *Adapter) isLastLine(message *router.Message) bool {
220+
if a.matchFirstLine {
221+
return false
222+
}
223+
224+
match := a.pattern.MatchString(message.Data)
225+
if a.negateMatch {
226+
return !match
227+
}
228+
229+
return match
230+
}
231+
232+
func multilineContainer(container *docker.Container, def bool) bool {
233+
for _, kv := range container.Config.Env {
234+
kvp := strings.SplitN(kv, "=", 2)
235+
if len(kvp) == 2 && kvp[0] == "LOGSPOUT_MULTILINE" {
236+
switch strings.ToLower(kvp[1]) {
237+
case "true":
238+
return true
239+
case "false":
240+
return false
241+
}
242+
return def
243+
}
244+
}
245+
246+
return def
247+
}

0 commit comments

Comments
 (0)