Skip to content

Commit d8d6703

Browse files
authored
Add option to read multiple active event files per directory (#1867)
* add DirectoryLoader implementation for multi-active-file loading * plumb option to use DirectoryLoader through event processing classes * add --reload_multifile{,_inactive_secs} flags to enable/configure multi-file directory loading behavior * update README.md FAQ to mention flag as an experimental option
1 parent c00d7fc commit d8d6703

13 files changed

+626
-22
lines changed

README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,13 @@ tensorboard in inspect mode to inspect the contents of your event files.
249249

250250
### TensorBoard is showing only some of my data, or isn't properly updating!
251251

252+
> **Update:** the [experimental `--reload_multifile=true` option][pr-1867] can
253+
> now be used to poll all "active" files in a directory for new data, rather
254+
> than the most recent one as described below. A file is "active" as long as it
255+
> received new data within `--reload_multifile_inactive_secs` seconds ago,
256+
> defaulting to 4000. You may need to install our nightly build
257+
> [`tb-nightly`][tb-nightly] for this option to be available.
258+
252259
This issue usually comes about because of how TensorBoard iterates through the
253260
`tfevents` files: it progresses through the events file in timestamp order, and
254261
only reads one file at a time. Let's suppose we have files with timestamps `a`
@@ -260,6 +267,12 @@ multiple summary writers, each one should be writing to a separate directory.
260267

261268
### Does TensorBoard support multiple or distributed summary writers?
262269

270+
> **Update:** the [experimental `--reload_multifile=true` option][pr-1867] can
271+
> now be used to poll all "active" files in a directory for new data, defined as
272+
> any file that received new data within `--reload_multifile_inactive_secs`
273+
> seconds ago, defaulting to 4000. You may need to install our nightly build
274+
> [`tb-nightly`][tb-nightly] for this option to be available.
275+
263276
No. TensorBoard expects that only one events file will be written to at a time,
264277
and multiple summary writers means multiple events files. If you are running a
265278
distributed TensorFlow instance, we encourage you to designate a single worker
@@ -275,6 +288,12 @@ with itself, there are a few possible explanations.
275288
* You may have multiple execution of TensorFlow that all wrote to the same log
276289
directory. Please have each TensorFlow run write to its own logdir.
277290

291+
> **Update:** the [experimental `--reload_multifile=true` option][pr-1867] can
292+
> now be used to poll all "active" files in a directory for new data, defined
293+
> as any file that received new data within `--reload_multifile_inactive_secs`
294+
> seconds ago, defaulting to 4000. You may need to install our nightly build
295+
> [`tb-nightly`][tb-nightly] for this option to be available.
296+
278297
* You may have a bug in your code where the global_step variable (passed
279298
to `FileWriter.add_summary`) is being maintained incorrectly.
280299

@@ -380,3 +399,5 @@ information as you can provide (e.g. attaching events files, including the outpu
380399
of `tensorboard --inspect`, etc.).
381400

382401
[stack-overflow]: https://stackoverflow.com/questions/tagged/tensorboard
402+
[pr-1867]: https://github.com/tensorflow/tensorboard/pull/1867
403+
[tb-nightly]: https://pypi.org/project/tb-nightly/

tensorboard/backend/application.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,13 @@ def standard_tensorboard_wsgi(flags, plugin_loaders, assets_zip_provider):
106106
:type plugin_loaders: list[base_plugin.TBLoader]
107107
:rtype: TensorBoardWSGI
108108
"""
109+
event_file_active_filter = _get_event_file_active_filter(flags)
109110
multiplexer = event_multiplexer.EventMultiplexer(
110111
size_guidance=DEFAULT_SIZE_GUIDANCE,
111112
tensor_size_guidance=tensor_size_guidance_from_flags(flags),
112113
purge_orphaned_data=flags.purge_orphaned_data,
113-
max_reload_threads=flags.max_reload_threads)
114+
max_reload_threads=flags.max_reload_threads,
115+
event_file_active_filter=event_file_active_filter)
114116
loading_multiplexer = multiplexer
115117
reload_interval = flags.reload_interval
116118
# For db import op mode, prefer reloading in a child process. See
@@ -530,3 +532,20 @@ def _clean_path(path, path_prefix=""):
530532
if path != path_prefix + '/' and path.endswith('/'):
531533
return path[:-1]
532534
return path
535+
536+
537+
def _get_event_file_active_filter(flags):
538+
"""Returns a predicate for whether an event file load timestamp is active.
539+
540+
Returns:
541+
A predicate function accepting a single UNIX timestamp float argument, or
542+
None if multi-file loading is not enabled.
543+
"""
544+
if not flags.reload_multifile:
545+
return None
546+
inactive_secs = flags.reload_multifile_inactive_secs
547+
if inactive_secs == 0:
548+
return None
549+
if inactive_secs < 0:
550+
return lambda timestamp: True
551+
return lambda timestamp: timestamp + inactive_secs >= time.time()

tensorboard/backend/application_test.py

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import shutil
2828
import socket
2929
import tempfile
30+
import time
3031

3132
import six
3233

@@ -58,7 +59,9 @@ def __init__(
5859
db_import=False,
5960
db_import_use_op=False,
6061
window_title='',
61-
path_prefix=''):
62+
path_prefix='',
63+
reload_multifile=False,
64+
reload_multifile_inactive_secs=4000):
6265
self.logdir = logdir
6366
self.purge_orphaned_data = purge_orphaned_data
6467
self.reload_interval = reload_interval
@@ -70,6 +73,8 @@ def __init__(
7073
self.db_import_use_op = db_import_use_op
7174
self.window_title = window_title
7275
self.path_prefix = path_prefix
76+
self.reload_multifile = reload_multifile
77+
self.reload_multifile_inactive_secs = reload_multifile_inactive_secs
7378

7479

7580
class FakePlugin(base_plugin.TBPlugin):
@@ -366,6 +371,38 @@ def testSlashlessRoute(self):
366371
self._test('runaway', False)
367372

368373

374+
class GetEventFileActiveFilterTest(tb_test.TestCase):
375+
376+
def testDisabled(self):
377+
flags = FakeFlags('logdir', reload_multifile=False)
378+
self.assertIsNone(application._get_event_file_active_filter(flags))
379+
380+
def testInactiveSecsZero(self):
381+
flags = FakeFlags('logdir', reload_multifile=True,
382+
reload_multifile_inactive_secs=0)
383+
self.assertIsNone(application._get_event_file_active_filter(flags))
384+
385+
def testInactiveSecsNegative(self):
386+
flags = FakeFlags('logdir', reload_multifile=True,
387+
reload_multifile_inactive_secs=-1)
388+
filter_fn = application._get_event_file_active_filter(flags)
389+
self.assertTrue(filter_fn(0))
390+
self.assertTrue(filter_fn(time.time()))
391+
self.assertTrue(filter_fn(float("inf")))
392+
393+
def testInactiveSecs(self):
394+
flags = FakeFlags('logdir', reload_multifile=True,
395+
reload_multifile_inactive_secs=10)
396+
filter_fn = application._get_event_file_active_filter(flags)
397+
with mock.patch.object(time, 'time') as mock_time:
398+
mock_time.return_value = 100
399+
self.assertFalse(filter_fn(0))
400+
self.assertFalse(filter_fn(time.time() - 11))
401+
self.assertTrue(filter_fn(time.time() - 10))
402+
self.assertTrue(filter_fn(time.time()))
403+
self.assertTrue(filter_fn(float("inf")))
404+
405+
369406
class ParseEventFilesSpecTest(tb_test.TestCase):
370407

371408
def assertPlatformSpecificLogdirParsing(self, pathObj, logdir, expected):

tensorboard/backend/event_processing/BUILD

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,33 @@ py_test(
3030
],
3131
)
3232

33+
py_library(
34+
name = "directory_loader",
35+
srcs = ["directory_loader.py"],
36+
srcs_version = "PY2AND3",
37+
deps = [
38+
":directory_watcher",
39+
":io_wrapper",
40+
"//tensorboard/compat:tensorflow",
41+
"//tensorboard/util:tb_logging",
42+
],
43+
)
44+
45+
py_test(
46+
name = "directory_loader_test",
47+
size = "small",
48+
srcs = ["directory_loader_test.py"],
49+
srcs_version = "PY2AND3",
50+
deps = [
51+
":directory_loader",
52+
":directory_watcher",
53+
":event_file_loader",
54+
"//tensorboard:expect_tensorflow_installed",
55+
"//tensorboard/util:test_util",
56+
"@org_pythonhosted_mock",
57+
],
58+
)
59+
3360
py_library(
3461
name = "directory_watcher",
3562
srcs = ["directory_watcher.py"],
@@ -101,6 +128,7 @@ py_library(
101128
srcs_version = "PY2AND3",
102129
visibility = ["//visibility:public"],
103130
deps = [
131+
":directory_loader",
104132
":directory_watcher",
105133
":event_file_loader",
106134
":io_wrapper",
@@ -190,6 +218,7 @@ py_test(
190218
":event_accumulator",
191219
":event_multiplexer",
192220
"//tensorboard:expect_tensorflow_installed",
221+
"//tensorboard/util:test_util",
193222
],
194223
)
195224

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
# ==============================================================================
15+
16+
"""Implementation for a multi-file directory loader."""
17+
18+
from __future__ import absolute_import
19+
from __future__ import division
20+
from __future__ import print_function
21+
22+
from tensorboard.backend.event_processing import directory_watcher
23+
from tensorboard.backend.event_processing import io_wrapper
24+
from tensorboard.compat import tf
25+
from tensorboard.util import tb_logging
26+
27+
28+
logger = tb_logging.get_logger()
29+
30+
31+
# Sentinel object for an inactive path.
32+
_INACTIVE = object()
33+
34+
35+
class DirectoryLoader(object):
36+
"""Loader for an entire directory, maintaining multiple active file loaders.
37+
38+
This class takes a directory, a factory for loaders, and optionally a
39+
path filter and watches all the paths inside that directory for new data.
40+
Each file loader created by the factory must read a path and produce an
41+
iterator of (timestamp, value) pairs.
42+
43+
Unlike DirectoryWatcher, this class does not assume that only one file
44+
receives new data at a time; there can be arbitrarily many active files.
45+
However, any file whose maximum load timestamp fails an "active" predicate
46+
will be marked as inactive and no longer checked for new data.
47+
"""
48+
49+
def __init__(self, directory, loader_factory, path_filter=lambda x: True,
50+
active_filter=lambda timestamp: True):
51+
"""Constructs a new MultiFileDirectoryLoader.
52+
53+
Args:
54+
directory: The directory to load files from.
55+
loader_factory: A factory for creating loaders. The factory should take a
56+
path and return an object that has a Load method returning an iterator
57+
yielding (unix timestamp as float, value) pairs for any new data
58+
path_filter: If specified, only paths matching this filter are loaded.
59+
active_filter: If specified, any loader whose maximum load timestamp does
60+
not pass this filter will be marked as inactive and no longer read.
61+
62+
Raises:
63+
ValueError: If directory or loader_factory are None.
64+
"""
65+
if directory is None:
66+
raise ValueError('A directory is required')
67+
if loader_factory is None:
68+
raise ValueError('A loader factory is required')
69+
self._directory = directory
70+
self._loader_factory = loader_factory
71+
self._path_filter = path_filter
72+
self._active_filter = active_filter
73+
self._loaders = {}
74+
self._max_timestamps = {}
75+
76+
def Load(self):
77+
"""Loads new values from all active files.
78+
79+
Yields:
80+
All values that have not been yielded yet.
81+
82+
Raises:
83+
DirectoryDeletedError: If the directory has been permanently deleted
84+
(as opposed to being temporarily unavailable).
85+
"""
86+
try:
87+
all_paths = io_wrapper.ListDirectoryAbsolute(self._directory)
88+
paths = sorted(p for p in all_paths if self._path_filter(p))
89+
for path in paths:
90+
for value in self._LoadPath(path):
91+
yield value
92+
except tf.errors.OpError as e:
93+
if not tf.io.gfile.exists(self._directory):
94+
raise directory_watcher.DirectoryDeletedError(
95+
'Directory %s has been permanently deleted' % self._directory)
96+
else:
97+
logger.info('Ignoring error during file loading: %s' % e)
98+
99+
def _LoadPath(self, path):
100+
"""Generator for values from a single path's loader.
101+
102+
Args:
103+
path: the path to load from
104+
105+
Yields:
106+
All values from this path's loader that have not been yielded yet.
107+
"""
108+
max_timestamp = self._max_timestamps.get(path, None)
109+
if max_timestamp is _INACTIVE or self._MarkIfInactive(path, max_timestamp):
110+
logger.debug('Skipping inactive path %s', path)
111+
return
112+
loader = self._loaders.get(path, None)
113+
if loader is None:
114+
try:
115+
loader = self._loader_factory(path)
116+
except tf.errors.NotFoundError:
117+
# Happens if a file was removed after we listed the directory.
118+
logger.debug('Skipping nonexistent path %s', path)
119+
return
120+
self._loaders[path] = loader
121+
logger.info('Loading data from path %s', path)
122+
for timestamp, value in loader.Load():
123+
if max_timestamp is None or timestamp > max_timestamp:
124+
max_timestamp = timestamp
125+
yield value
126+
if not self._MarkIfInactive(path, max_timestamp):
127+
self._max_timestamps[path] = max_timestamp
128+
129+
def _MarkIfInactive(self, path, max_timestamp):
130+
"""If max_timestamp is inactive, returns True and marks the path as such."""
131+
logger.debug('Checking active status of %s at %s', path, max_timestamp)
132+
if max_timestamp is not None and not self._active_filter(max_timestamp):
133+
self._max_timestamps[path] = _INACTIVE
134+
del self._loaders[path]
135+
return True
136+
return False

0 commit comments

Comments
 (0)