Skip to content

Commit 7d0c4c3

Browse files
leonardo-albertovichedsiper
authored andcommitted
in_tail: make read_from_head after the initial discovery optional
Additionally, when a file is ignored due to ignore_older its size is stored to ensure that if the file is modified we are able to ingest the data that triggered the timestamp update. Signed-off-by: Leonardo Alminana <[email protected]>
1 parent 67ad458 commit 7d0c4c3

File tree

9 files changed

+134
-16
lines changed

9 files changed

+134
-16
lines changed

plugins/in_tail/tail.c

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -396,13 +396,15 @@ static int in_tail_init(struct flb_input_instance *in,
396396
}
397397
#endif
398398

399-
/*
400-
* After the first scan (on start time), all new files discovered needs to be
401-
* read from head, so we switch the 'read_from_head' flag to true so any
402-
* other file discovered after a scan or a rotation are read from the
403-
* beginning.
404-
*/
405-
ctx->read_from_head = FLB_TRUE;
399+
if (ctx->read_newly_discovered_files_from_head) {
400+
/*
401+
* After the first scan (on start time), all new files discovered needs to be
402+
* read from head, so we switch the 'read_from_head' flag to true so any
403+
* other file discovered after a scan or a rotation are read from the
404+
* beginning.
405+
*/
406+
ctx->read_from_head = FLB_TRUE;
407+
}
406408

407409
/* Set plugin context */
408410
flb_input_set_context(in, ctx);
@@ -594,6 +596,12 @@ static struct flb_config_map config_map[] = {
594596
"For new discovered files on start (without a database offset/position), read the "
595597
"content from the head of the file, not tail."
596598
},
599+
{
600+
FLB_CONFIG_MAP_BOOL, "read_newly_discovered_files_from_head", "true",
601+
0, FLB_TRUE, offsetof(struct flb_tail_config, read_newly_discovered_files_from_head),
602+
"For new discovered files after start (without a database offset/position), read the "
603+
"content from the head of the file, not tail."
604+
},
597605
{
598606
FLB_CONFIG_MAP_STR, "refresh_interval", "60",
599607
0, FLB_FALSE, 0,

plugins/in_tail/tail_config.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,14 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
279279
return NULL;
280280
}
281281

282+
/* hash table for files lookups */
283+
ctx->ignored_file_sizes = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 1000, 0);
284+
if (ctx->ignored_file_sizes == NULL) {
285+
flb_plg_error(ctx->ins, "could not create ignored file size hash table");
286+
flb_tail_config_destroy(ctx);
287+
return NULL;
288+
}
289+
282290
#ifdef FLB_HAVE_SQLDB
283291
ctx->db = NULL;
284292
#endif
@@ -507,10 +515,16 @@ int flb_tail_config_destroy(struct flb_tail_config *config)
507515
if (config->static_hash) {
508516
flb_hash_table_destroy(config->static_hash);
509517
}
518+
510519
if (config->event_hash) {
511520
flb_hash_table_destroy(config->event_hash);
512521
}
513522

523+
if (config->ignored_file_sizes != NULL) {
524+
flb_hash_table_destroy(config->ignored_file_sizes);
525+
}
526+
514527
flb_free(config);
528+
515529
return 0;
516530
}

plugins/in_tail/tail_config.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ struct flb_tail_config {
8282
#endif
8383
int refresh_interval_sec; /* seconds to re-scan */
8484
long refresh_interval_nsec;/* nanoseconds to re-scan */
85+
int read_newly_discovered_files_from_head; /* read new files from head after startup */
8586
int read_from_head; /* read new files from head */
8687
int rotate_wait; /* sec to wait on rotated files */
8788
int watcher_interval; /* watcher interval */
@@ -169,6 +170,8 @@ struct flb_tail_config {
169170
struct flb_hash_table *static_hash;
170171
struct flb_hash_table *event_hash;
171172

173+
struct flb_hash_table *ignored_file_sizes;
174+
172175
struct flb_config *config;
173176
};
174177

plugins/in_tail/tail_file.c

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -911,13 +911,24 @@ static int set_file_position(struct flb_tail_config *ctx,
911911
return 0;
912912
}
913913

914-
/* tail... */
915-
ret = lseek(file->fd, 0, SEEK_END);
916-
if (ret == -1) {
917-
flb_errno();
918-
return -1;
914+
if (file->offset > 0) {
915+
ret = lseek(file->fd, file->offset, SEEK_SET);
916+
917+
if (ret == -1) {
918+
flb_errno();
919+
return -1;
920+
}
921+
}
922+
else {
923+
ret = lseek(file->fd, 0, SEEK_END);
924+
925+
if (ret == -1) {
926+
flb_errno();
927+
return -1;
928+
}
929+
930+
file->offset = ret;
919931
}
920-
file->offset = ret;
921932

922933
if (file->decompression_context == NULL) {
923934
file->stream_offset = ret;
@@ -966,6 +977,7 @@ static int ml_flush_callback(struct flb_ml_parser *parser,
966977
}
967978

968979
int flb_tail_file_append(char *path, struct stat *st, int mode,
980+
ssize_t offset,
969981
struct flb_tail_config *ctx)
970982
{
971983
int fd;
@@ -1055,6 +1067,10 @@ int flb_tail_file_append(char *path, struct stat *st, int mode,
10551067
file->mult_flush_timeout = 0;
10561068
file->mult_skipping = FLB_FALSE;
10571069

1070+
if (offset != -1) {
1071+
file->offset = offset;
1072+
}
1073+
10581074
if (strlen(path) >= 3 &&
10591075
strcasecmp(&path[strlen(path) - 3], ".gz") == 0) {
10601076
file->decompression_context =
@@ -1937,7 +1953,7 @@ int flb_tail_file_rotated(struct flb_tail_file *file)
19371953
ret = stat(tmp, &st);
19381954
if (ret == 0 && st.st_ino != file->inode) {
19391955
if (flb_tail_file_exists(&st, ctx) == FLB_FALSE) {
1940-
ret = flb_tail_file_append(tmp, &st, FLB_TAIL_STATIC, ctx);
1956+
ret = flb_tail_file_append(tmp, &st, FLB_TAIL_STATIC, -1, ctx);
19411957
if (ret == -1) {
19421958
flb_tail_scan(ctx->path_list, ctx);
19431959
}

plugins/in_tail/tail_file.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ int flb_tail_file_name_dup(char *path, struct flb_tail_file *file);
119119
int flb_tail_file_to_event(struct flb_tail_file *file);
120120
int flb_tail_file_chunk(struct flb_tail_file *file);
121121
int flb_tail_file_append(char *path, struct stat *st, int mode,
122+
ssize_t offset,
122123
struct flb_tail_config *ctx);
123124
void flb_tail_file_remove(struct flb_tail_file *file);
124125
int flb_tail_file_remove_all(struct flb_tail_config *ctx);

plugins/in_tail/tail_scan.c

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,30 @@
3030
#include "tail_scan_glob.c"
3131
#endif
3232

33+
void flb_tail_scan_register_ignored_file_size(struct flb_tail_config *ctx, const char *path, size_t path_length, size_t size)
34+
{
35+
flb_hash_table_add(ctx->ignored_file_sizes, path, path_length, (void *) size, 0);
36+
37+
}
38+
39+
void flb_tail_scan_unregister_ignored_file_size(struct flb_tail_config *ctx, const char *path, size_t path_length)
40+
{
41+
flb_hash_table_del(ctx->ignored_file_sizes, path);
42+
}
43+
44+
ssize_t flb_tail_scan_fetch_ignored_file_size(struct flb_tail_config *ctx, const char *path, size_t path_length)
45+
{
46+
ssize_t result;
47+
48+
result = (ssize_t) flb_hash_table_get_ptr(ctx->ignored_file_sizes, path, path_length);
49+
50+
if (result == 0) {
51+
result = -1;
52+
}
53+
54+
return result;
55+
}
56+
3357
int flb_tail_scan(struct mk_list *path_list, struct flb_tail_config *ctx)
3458
{
3559
int ret;

plugins/in_tail/tail_scan.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,8 @@ int flb_tail_scan(struct mk_list *path, struct flb_tail_config *ctx);
2626
int flb_tail_scan_callback(struct flb_input_instance *ins,
2727
struct flb_config *config, void *context);
2828

29+
void flb_tail_scan_register_ignored_file_size(struct flb_tail_config *ctx, const char *path, size_t path_length, size_t size);
30+
void flb_tail_scan_unregister_ignored_file_size(struct flb_tail_config *ctx, const char *path, size_t path_length);
31+
ssize_t flb_tail_scan_fetch_ignored_file_size(struct flb_tail_config *ctx, const char *path, size_t path_length);
32+
2933
#endif

plugins/in_tail/tail_scan_glob.c

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ static inline int do_glob(const char *pattern, int flags,
183183
return ret;
184184
}
185185

186+
186187
/* Scan a path, register the entries and return how many */
187188
static int tail_scan_path(const char *path, struct flb_tail_config *ctx)
188189
{
@@ -193,6 +194,9 @@ static int tail_scan_path(const char *path, struct flb_tail_config *ctx)
193194
time_t now;
194195
int64_t mtime;
195196
struct stat st;
197+
ssize_t ignored_file_size;
198+
199+
ignored_file_size = -1;
196200

197201
flb_plg_debug(ctx->ins, "scanning path %s", path);
198202

@@ -245,14 +249,36 @@ static int tail_scan_path(const char *path, struct flb_tail_config *ctx)
245249
if ((now - ctx->ignore_older) > mtime) {
246250
flb_plg_debug(ctx->ins, "excluded=%s (ignore_older)",
247251
globbuf.gl_pathv[i]);
252+
253+
flb_tail_scan_register_ignored_file_size(
254+
ctx,
255+
globbuf.gl_pathv[i],
256+
strlen(globbuf.gl_pathv[i]),
257+
st.st_size);
258+
248259
continue;
249260
}
250261
}
251262
}
252263

264+
if (ctx->ignore_older > 0) {
265+
ignored_file_size = flb_tail_scan_fetch_ignored_file_size(
266+
ctx,
267+
globbuf.gl_pathv[i],
268+
strlen(globbuf.gl_pathv[i]));
269+
270+
flb_tail_scan_unregister_ignored_file_size(
271+
ctx,
272+
globbuf.gl_pathv[i],
273+
strlen(globbuf.gl_pathv[i]));
274+
}
275+
253276
/* Append file to list */
254277
ret = flb_tail_file_append(globbuf.gl_pathv[i], &st,
255-
FLB_TAIL_STATIC, ctx);
278+
FLB_TAIL_STATIC,
279+
ignored_file_size,
280+
ctx);
281+
256282
if (ret == 0) {
257283
flb_plg_debug(ctx->ins, "scan_glob add(): %s, inode %" PRIu64,
258284
globbuf.gl_pathv[i], (uint64_t) st.st_ino);

plugins/in_tail/tail_scan_win32.c

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ static int tail_register_file(const char *target, struct flb_tail_config *ctx,
6565
int64_t mtime;
6666
struct stat st;
6767
char path[MAX_PATH];
68+
ssize_t ignored_file_size;
69+
70+
ignored_file_size = -1;
6871

6972
if (_fullpath(path, target, MAX_PATH) == NULL) {
7073
flb_plg_error(ctx->ins, "cannot get absolute path of %s", target);
@@ -81,6 +84,13 @@ static int tail_register_file(const char *target, struct flb_tail_config *ctx,
8184
if ((ts - ctx->ignore_older) > mtime) {
8285
flb_plg_debug(ctx->ins, "excluded=%s (ignore_older)",
8386
target);
87+
88+
flb_tail_scan_register_ignored_file_size(
89+
ctx,
90+
path,
91+
strlen(path),
92+
st.st_size);
93+
8494
return -1;
8595
}
8696
}
@@ -91,7 +101,19 @@ static int tail_register_file(const char *target, struct flb_tail_config *ctx,
91101
return -1;
92102
}
93103

94-
return flb_tail_file_append(path, &st, FLB_TAIL_STATIC, ctx);
104+
if (ctx->ignore_older > 0) {
105+
ignored_file_size = flb_tail_scan_fetch_ignored_file_size(
106+
ctx,
107+
path,
108+
strlen(path));
109+
110+
flb_tail_scan_unregister_ignored_file_size(
111+
ctx,
112+
path,
113+
strlen(path));
114+
}
115+
116+
return flb_tail_file_append(path, &st, FLB_TAIL_STATIC, ignored_file_size, ctx);
95117
}
96118

97119
/*

0 commit comments

Comments
 (0)