Skip to content

Commit e25c67e

Browse files
committed
Determine the fluentd async option based on Docker server version
Docker's fluentd log driver supports passing an "async" option. The "fluentd-async-connect" option was deprecated in Docker v20.10.0 and completely removed in v28.0.0. Reference: https://docs.docker.com/engine/deprecated/#fluentd-async-connect-log-opt Functionally, both options are equivalent, see: moby/moby#39086. Since Docker's refactor of the fluentd async option was not versioned and affects all API versions, we need to determine the appropriate option using the Docker Server version, in order to be backwards compatible.
1 parent 4e2b3fd commit e25c67e

File tree

3 files changed

+183
-22
lines changed

3 files changed

+183
-22
lines changed

agent/dockerclient/dockerapi/docker_client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1300,6 +1300,7 @@ func (dg *dockerGoClient) Version(ctx context.Context, timeout time.Duration) (s
13001300
}
13011301

13021302
version = info.Version
1303+
seelog.Debugf("Determined the Docker server version: %s", version)
13031304
dg.setDaemonVersion(version)
13041305
return version, nil
13051306
}

agent/engine/docker_task_engine.go

Lines changed: 78 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ const (
9797
logDriverFluentdAddress = "fluentd-address"
9898
dataLogDriverPath = "/data/firelens/"
9999
logDriverAsyncConnect = "fluentd-async-connect"
100+
logDriverAsync = "fluentd-async"
100101
logDriverSubSecondPrecision = "fluentd-sub-second-precision"
101102
logDriverBufferLimit = "fluentd-buffer-limit"
102103
dataLogDriverSocketPath = "/socket/fluent.sock"
@@ -1874,7 +1875,35 @@ func (engine *DockerTaskEngine) createContainer(task *apitask.Task, container *a
18741875
// Update the environment variables FLUENT_HOST and FLUENT_PORT depending on the supported network modes - bridge
18751876
// and awsvpc. For reference - https://docs.docker.com/config/containers/logging/fluentd/.
18761877
if hostConfig.LogConfig.Type == logDriverTypeFirelens {
1877-
hostConfig.LogConfig = getFirelensLogConfig(task, container, hostConfig, engine.cfg)
1878+
// We need the Docker server version in order to generate the appropriate firelens log config
1879+
dockerServerVersion, err := engine.Version()
1880+
if err != nil {
1881+
logger.Error("Failed to determine Docker server version for Firelens log config generation", logger.Fields{
1882+
field.TaskID: task.GetID(),
1883+
field.Container: container.Name,
1884+
field.Error: err,
1885+
})
1886+
return dockerapi.DockerContainerMetadata{
1887+
Error: dockerapi.CannotCreateContainerError{FromError: errors.Wrapf(versionErr,
1888+
"failed to create container - container uses awsfirelens log driver and we failed to "+
1889+
"determine the Docker server version for Firelens log config generation")},
1890+
}
1891+
}
1892+
logConfig, err := getFirelensLogConfig(task, container, hostConfig, engine.cfg, dockerServerVersion)
1893+
if err != nil {
1894+
logger.Error("Failed to generate the Firelens log config", logger.Fields{
1895+
field.TaskID: task.GetID(),
1896+
field.Container: container.Name,
1897+
field.Error: err,
1898+
})
1899+
return dockerapi.DockerContainerMetadata{
1900+
Error: dockerapi.CannotCreateContainerError{FromError: errors.Wrapf(err,
1901+
"failed to create container - container uses awsfirelens log driver and we failed to "+
1902+
"generate the Firelens log config")},
1903+
}
1904+
}
1905+
hostConfig.LogConfig = logConfig
1906+
18781907
if task.IsNetworkModeAWSVPC() {
18791908
container.MergeEnvironmentVariables(map[string]string{
18801909
fluentNetworkHost: FluentAWSVPCHostValue,
@@ -2108,29 +2137,64 @@ func (engine *DockerTaskEngine) createContainer(task *apitask.Task, container *a
21082137
return metadata
21092138
}
21102139

2111-
func getFirelensLogConfig(task *apitask.Task, container *apicontainer.Container,
2112-
hostConfig *dockercontainer.HostConfig, cfg *config.Config) dockercontainer.LogConfig {
2140+
// getFirelensLogConfig generates the fluentd log driver's log config.
2141+
// Every container that wants to use Firelens for logging, gets one instance of the fluentd log driver associated to it.
2142+
func getFirelensLogConfig(task *apitask.Task,
2143+
container *apicontainer.Container,
2144+
hostConfig *dockercontainer.HostConfig,
2145+
cfg *config.Config, dockerServerVersion string) (dockercontainer.LogConfig, error) {
2146+
var firelensConfig dockercontainer.LogConfig
2147+
// Set the log driver type
2148+
firelensConfig.Type = logDriverTypeFluentd
2149+
// Start from the existing container host config
2150+
hostConfigLogConfig := hostConfig.LogConfig
2151+
// Initialize a config to store the different log driver options
2152+
firelensConfig.Config = make(map[string]string)
2153+
// Generate a tag based on the task ID
21132154
fields := strings.Split(task.Arn, "/")
21142155
taskID := fields[len(fields)-1]
21152156
tag := fmt.Sprintf(fluentTagDockerFormat, container.Name, taskID)
2157+
firelensConfig.Config[logDriverTag] = tag
2158+
// Construct the fluent socket address
21162159
fluentd := socketPathPrefix + filepath.Join(cfg.DataDirOnHost, dataLogDriverPath, taskID, dataLogDriverSocketPath)
2117-
logConfig := hostConfig.LogConfig
2118-
bufferLimit, bufferLimitExists := logConfig.Config[apitask.FirelensLogDriverBufferLimitOption]
2119-
logConfig.Type = logDriverTypeFluentd
2120-
logConfig.Config = make(map[string]string)
2121-
logConfig.Config[logDriverTag] = tag
2122-
logConfig.Config[logDriverFluentdAddress] = fluentd
2123-
logConfig.Config[logDriverAsyncConnect] = strconv.FormatBool(cfg.FirelensAsyncEnabled.Enabled())
2124-
logConfig.Config[logDriverSubSecondPrecision] = strconv.FormatBool(true)
2160+
firelensConfig.Config[logDriverFluentdAddress] = fluentd
2161+
// Enable sub-second precision
2162+
firelensConfig.Config[logDriverSubSecondPrecision] = strconv.FormatBool(true)
2163+
// Set the log driver buffer limit if passed via the task payload
2164+
bufferLimit, bufferLimitExists := hostConfigLogConfig.Config[apitask.FirelensLogDriverBufferLimitOption]
21252165
if bufferLimitExists {
2126-
logConfig.Config[logDriverBufferLimit] = bufferLimit
2166+
firelensConfig.Config[logDriverBufferLimit] = bufferLimit
2167+
}
2168+
// Determine whether to use the "fluentd-async" option or the legacy "fluentd-async-connect" option.
2169+
// "fluentd-async-connect" option was deprecated in Docker v20.10.0 and removed in v28.0.0.
2170+
// It was replaced with the "fluentd-async" option starting Docker v20.10.0.
2171+
// Docker v20.10.0 release notes: https://docs.docker.com/engine/release-notes/20.10/#logging-2
2172+
// Docker v28.0.0 release notes: https://docs.docker.com/engine/release-notes/28/#removed
2173+
// This change is not versioned and applies to all Docker client API versions.
2174+
// Hence, in order to preserve backwards-compatibility with Docker server versions older than v20.10.0,
2175+
// we need to continue using the older fluentd-async-connect option.
2176+
isAsyncCompatible, err := utils.Version(dockerServerVersion).Matches(">=20.10.0")
2177+
if err != nil {
2178+
logger.Error("Unable to determine whether the Docker server version is at least 20.10.0", logger.Fields{
2179+
field.TaskID: task.GetID(),
2180+
field.Container: container.Name,
2181+
field.Error: err,
2182+
})
2183+
return dockercontainer.LogConfig{}, errors.Wrapf(err,
2184+
"unable to determine whether the Docker server version is at least 20.10.0")
21272185
}
2186+
if isAsyncCompatible {
2187+
firelensConfig.Config[logDriverAsync] = strconv.FormatBool(cfg.FirelensAsyncEnabled.Enabled())
2188+
} else {
2189+
firelensConfig.Config[logDriverAsyncConnect] = strconv.FormatBool(cfg.FirelensAsyncEnabled.Enabled())
2190+
}
2191+
21282192
logger.Debug("Applying firelens log config for container", logger.Fields{
21292193
field.TaskID: task.GetID(),
21302194
field.Container: container.Name,
2131-
"config": logConfig,
2195+
"config": firelensConfig,
21322196
})
2133-
return logConfig
2197+
return firelensConfig, nil
21342198
}
21352199

21362200
func (engine *DockerTaskEngine) startContainer(task *apitask.Task, container *apicontainer.Container) dockerapi.DockerContainerMetadata {

agent/engine/docker_task_engine_test.go

Lines changed: 104 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ const (
115115
serviceConnectContainerName = "service-connect"
116116
mediaTypeManifestV2 = "application/vnd.docker.distribution.manifest.v2+json"
117117
defaultIfname = "eth0"
118+
testDockerServerVersion = "25.0.8"
118119
)
119120

120121
var (
@@ -3321,7 +3322,7 @@ func TestCreateContainerAddFirelensLogDriverConfig(t *testing.T) {
33213322
expectedLogConfigType string
33223323
expectedLogConfigTag string
33233324
expectedLogConfigFluentAddress string
3324-
expectedFluentdAsyncConnect string
3325+
expectedFluentdAsync string
33253326
expectedSubSecondPrecision string
33263327
expectedBufferLimit string
33273328
expectedIPAddress string
@@ -3333,7 +3334,7 @@ func TestCreateContainerAddFirelensLogDriverConfig(t *testing.T) {
33333334
enableServiceConnect: false,
33343335
expectedLogConfigType: logDriverTypeFluentd,
33353336
expectedLogConfigTag: taskName + "-firelens-" + taskID,
3336-
expectedFluentdAsyncConnect: strconv.FormatBool(true),
3337+
expectedFluentdAsync: strconv.FormatBool(true),
33373338
expectedSubSecondPrecision: strconv.FormatBool(true),
33383339
expectedBufferLimit: "10000",
33393340
expectedLogConfigFluentAddress: socketPathPrefix + filepath.Join(defaultConfig.DataDirOnHost, dataLogDriverPath, taskID, dataLogDriverSocketPath),
@@ -3346,7 +3347,7 @@ func TestCreateContainerAddFirelensLogDriverConfig(t *testing.T) {
33463347
enableServiceConnect: false,
33473348
expectedLogConfigType: logDriverTypeFluentd,
33483349
expectedLogConfigTag: taskName + "-firelens-" + taskID,
3349-
expectedFluentdAsyncConnect: strconv.FormatBool(true),
3350+
expectedFluentdAsync: strconv.FormatBool(true),
33503351
expectedSubSecondPrecision: strconv.FormatBool(true),
33513352
expectedBufferLimit: "10000",
33523353
expectedLogConfigFluentAddress: socketPathPrefix + filepath.Join(defaultConfig.DataDirOnHost, dataLogDriverPath, taskID, dataLogDriverSocketPath),
@@ -3359,7 +3360,7 @@ func TestCreateContainerAddFirelensLogDriverConfig(t *testing.T) {
33593360
enableServiceConnect: true,
33603361
expectedLogConfigType: logDriverTypeFluentd,
33613362
expectedLogConfigTag: taskName + "-firelens-" + taskID,
3362-
expectedFluentdAsyncConnect: strconv.FormatBool(true),
3363+
expectedFluentdAsync: strconv.FormatBool(true),
33633364
expectedSubSecondPrecision: strconv.FormatBool(true),
33643365
expectedBufferLimit: "10000",
33653366
expectedLogConfigFluentAddress: socketPathPrefix + filepath.Join(defaultConfig.DataDirOnHost, dataLogDriverPath, taskID, dataLogDriverSocketPath),
@@ -3372,7 +3373,7 @@ func TestCreateContainerAddFirelensLogDriverConfig(t *testing.T) {
33723373
enableServiceConnect: false,
33733374
expectedLogConfigType: logDriverTypeFluentd,
33743375
expectedLogConfigTag: taskName + "-firelens-" + taskID,
3375-
expectedFluentdAsyncConnect: strconv.FormatBool(true),
3376+
expectedFluentdAsync: strconv.FormatBool(true),
33763377
expectedSubSecondPrecision: strconv.FormatBool(true),
33773378
expectedBufferLimit: "",
33783379
expectedLogConfigFluentAddress: socketPathPrefix + filepath.Join(defaultConfig.DataDirOnHost, dataLogDriverPath, taskID, dataLogDriverSocketPath),
@@ -3388,6 +3389,7 @@ func TestCreateContainerAddFirelensLogDriverConfig(t *testing.T) {
33883389
ctrl, client, _, taskEngine, _, _, _, serviceConnectManager := mocks(t, ctx, &defaultConfig)
33893390
defer ctrl.Finish()
33903391

3392+
client.EXPECT().Version(gomock.Any(), gomock.Any()).Return(testDockerServerVersion, nil).Times(1)
33913393
client.EXPECT().APIVersion().Return(defaultDockerClientAPIVersion, nil).AnyTimes()
33923394
if tc.enableServiceConnect {
33933395
serviceConnectManager.EXPECT().AugmentTaskContainer(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1)
@@ -3401,7 +3403,7 @@ func TestCreateContainerAddFirelensLogDriverConfig(t *testing.T) {
34013403
assert.Equal(t, tc.expectedLogConfigType, hostConfig.LogConfig.Type)
34023404
assert.Equal(t, tc.expectedLogConfigTag, hostConfig.LogConfig.Config["tag"])
34033405
assert.Equal(t, tc.expectedLogConfigFluentAddress, hostConfig.LogConfig.Config["fluentd-address"])
3404-
assert.Equal(t, tc.expectedFluentdAsyncConnect, hostConfig.LogConfig.Config["fluentd-async-connect"])
3406+
assert.Equal(t, tc.expectedFluentdAsync, hostConfig.LogConfig.Config["fluentd-async"])
34053407
assert.Equal(t, tc.expectedSubSecondPrecision, hostConfig.LogConfig.Config["fluentd-sub-second-precision"])
34063408
assert.Equal(t, tc.expectedBufferLimit, hostConfig.LogConfig.Config["fluentd-buffer-limit"])
34073409
assert.Contains(t, config.Env, tc.expectedIPAddress)
@@ -5211,8 +5213,102 @@ func TestGetFirelensConfigWithAsyncEnabledConfigOption(t *testing.T) {
52115213
cfg := &config.Config{
52125214
FirelensAsyncEnabled: asyncEnabled,
52135215
}
5214-
logConfig := getFirelensLogConfig(task, appContainer, rawHostConfigInput, cfg)
5215-
assert.Equal(t, tc.isFirelensAsyncEnabled, logConfig.Config[logDriverAsyncConnect])
5216+
logConfig, err := getFirelensLogConfig(task, appContainer, rawHostConfigInput, cfg,
5217+
testDockerServerVersion)
5218+
assert.NoError(t, err)
5219+
assert.Equal(t, tc.isFirelensAsyncEnabled, logConfig.Config[logDriverAsync])
5220+
})
5221+
}
5222+
}
5223+
5224+
// TestGetFirelensConfigBasedOnDockerServerVersion validates that GetFirelensConfig populates the correct fluentd async
5225+
// option depending on the Docker Server version provided.
5226+
func TestGetFirelensConfigBasedOnDockerServerVersion(t *testing.T) {
5227+
rawHostConfigInput := &dockercontainer.HostConfig{
5228+
LogConfig: dockercontainer.LogConfig{
5229+
Type: "awsfirelens",
5230+
Config: map[string]string{
5231+
"log-driver-buffer-limit": "10000",
5232+
},
5233+
},
5234+
}
5235+
5236+
rawHostConfig, err := json.Marshal(&rawHostConfigInput)
5237+
require.NoError(t, err)
5238+
hostConfig := func() *string {
5239+
s := string(rawHostConfig)
5240+
return &s
5241+
}()
5242+
5243+
appContainer := &apicontainer.Container{
5244+
Name: "app",
5245+
DockerConfig: apicontainer.DockerConfig{
5246+
HostConfig: hostConfig,
5247+
},
5248+
}
5249+
5250+
firelensContainer := &apicontainer.Container{
5251+
Name: "firelens",
5252+
FirelensConfig: &apicontainer.FirelensConfig{
5253+
Type: "fluentbit",
5254+
},
5255+
}
5256+
5257+
task := &apitask.Task{
5258+
Arn: "arn:aws:ecs:region:account-id:task/task-id",
5259+
Containers: []*apicontainer.Container{
5260+
appContainer,
5261+
firelensContainer,
5262+
},
5263+
}
5264+
5265+
testCases := []struct {
5266+
name string
5267+
dockerServerVersion string
5268+
expectError bool
5269+
includedAsyncOption string
5270+
excludedAsyncOption string
5271+
}{
5272+
{
5273+
name: "use fluentd-async",
5274+
dockerServerVersion: testDockerServerVersion,
5275+
expectError: false,
5276+
includedAsyncOption: logDriverAsync,
5277+
excludedAsyncOption: logDriverAsyncConnect,
5278+
},
5279+
{
5280+
name: "use fluentd-async-connect",
5281+
dockerServerVersion: "19.03.13-ce",
5282+
expectError: false,
5283+
includedAsyncOption: logDriverAsyncConnect,
5284+
excludedAsyncOption: logDriverAsync,
5285+
},
5286+
{
5287+
name: "error while determining Docker server version",
5288+
dockerServerVersion: "",
5289+
expectError: true,
5290+
includedAsyncOption: "",
5291+
excludedAsyncOption: "",
5292+
},
5293+
}
5294+
5295+
for _, tc := range testCases {
5296+
t.Run(tc.name, func(t *testing.T) {
5297+
5298+
cfg := &config.Config{
5299+
FirelensAsyncEnabled: config.BooleanDefaultTrue{Value: config.ExplicitlyEnabled},
5300+
}
5301+
logConfig, configError := getFirelensLogConfig(task, appContainer, rawHostConfigInput, cfg,
5302+
tc.dockerServerVersion)
5303+
assert.Equal(t, tc.expectError, configError != nil)
5304+
// Verify whether the logConfig.Config map contains the right fluentd async key and value
5305+
if tc.includedAsyncOption != "" {
5306+
assert.Contains(t, logConfig.Config, tc.includedAsyncOption)
5307+
assert.Equal(t, "true", logConfig.Config[tc.includedAsyncOption])
5308+
} else {
5309+
assert.NotContains(t, logConfig.Config, tc.includedAsyncOption)
5310+
}
5311+
assert.NotContains(t, logConfig.Config, tc.excludedAsyncOption)
52165312
})
52175313
}
52185314
}

0 commit comments

Comments
 (0)