From 81f1385a00cb139f70bb3261452b2a2777c21f69 Mon Sep 17 00:00:00 2001 From: Nick Felt Date: Mon, 22 Jul 2019 18:41:01 -0700 Subject: [PATCH 01/11] add DirectoryLoader implementation for multi-active-file loading --- tensorboard/backend/event_processing/BUILD | 25 ++ .../event_processing/directory_loader.py | 130 ++++++++++ .../event_processing/directory_loader_test.py | 224 ++++++++++++++++++ .../event_processing/event_file_loader.py | 17 ++ 4 files changed, 396 insertions(+) create mode 100644 tensorboard/backend/event_processing/directory_loader.py create mode 100644 tensorboard/backend/event_processing/directory_loader_test.py diff --git a/tensorboard/backend/event_processing/BUILD b/tensorboard/backend/event_processing/BUILD index f360a17109..e33566aca9 100644 --- a/tensorboard/backend/event_processing/BUILD +++ b/tensorboard/backend/event_processing/BUILD @@ -30,6 +30,31 @@ py_test( ], ) +py_library( + name = "directory_loader", + srcs = ["directory_loader.py"], + srcs_version = "PY2AND3", + deps = [ + ":directory_watcher", + ":io_wrapper", + "//tensorboard/compat:tensorflow", + "//tensorboard/util:tb_logging", + ], +) + +py_test( + name = "directory_loader_test", + size = "small", + srcs = ["directory_loader_test.py"], + srcs_version = "PY2AND3", + deps = [ + ":directory_loader", + ":directory_watcher", + "//tensorboard:expect_tensorflow_installed", + "@org_pythonhosted_mock", + ], +) + py_library( name = "directory_watcher", srcs = ["directory_watcher.py"], diff --git a/tensorboard/backend/event_processing/directory_loader.py b/tensorboard/backend/event_processing/directory_loader.py new file mode 100644 index 0000000000..952fc407c4 --- /dev/null +++ b/tensorboard/backend/event_processing/directory_loader.py @@ -0,0 +1,130 @@ +# Copyright 2019 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +"""Implementation for a multi-file directory loader.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from tensorboard.backend.event_processing import directory_watcher +from tensorboard.backend.event_processing import io_wrapper +from tensorboard.compat import tf +from tensorboard.util import tb_logging + + +logger = tb_logging.get_logger() + + +# Sentinel object for an inactive path. +_INACTIVE = object() + + +class DirectoryLoader(object): + """Loader for an entire directory, maintaining multiple active file loaders. + + This class takes a directory, a factory for loaders, and optionally a + path filter and watches all the paths inside that directory for new data. + Each file loader created by the factory must read a path and produce an + iterator of (timestamp, value) pairs. + + Unlike DirectoryWatcher, this class does not assume that only one file + receives new data at a time; there can be arbitrarily many active files. + However, any file whose maximum load timestamp fails an "active" predicate + will be marked as inactive and no longer checked for new data. + """ + + def __init__(self, directory, loader_factory, path_filter=lambda x: True, + active_filter=lambda timestamp: True): + """Constructs a new MultiFileDirectoryLoader. + + Args: + directory: The directory to load files from. + loader_factory: A factory for creating loaders. The factory should take a + path and return an object that has a Load method returning an iterator + yielding (unix timestamp as float, value) pairs for any new data + path_filter: If specified, only paths matching this filter are loaded. + active_filter: If specified, any loader whose maximum load timestamp does + not pass this filter will be marked as inactive and no longer read. + + Raises: + ValueError: If directory or loader_factory are None. + """ + if directory is None: + raise ValueError('A directory is required') + if loader_factory is None: + raise ValueError('A loader factory is required') + self._directory = directory + self._loader_factory = loader_factory + self._path_filter = path_filter + self._active_filter = active_filter + self._loaders = {} + self._max_timestamps = {} + + def Load(self): + """Loads new values from all active files. + + Yields: + All values that have not been yielded yet. + + Raises: + DirectoryDeletedError: If the directory has been permanently deleted + (as opposed to being temporarily unavailable). + """ + try: + all_paths = io_wrapper.ListDirectoryAbsolute(self._directory) + paths = sorted(p for p in all_paths if self._path_filter(p)) + for path in paths: + for value in self._LoadPath(path): + yield value + except tf.errors.OpError: + if not tf.io.gfile.exists(self._directory): + raise directory_watcher.DirectoryDeletedError( + 'Directory %s has been permanently deleted' % self._directory) + + def _LoadPath(self, path): + """Generator for values from a single path's loader. + + Args: + path: the path to load from + + Yields: + All values from this path's loader that have not been yielded yet. + """ + max_timestamp = self._max_timestamps.get(path, None) + if max_timestamp is _INACTIVE or self._MarkIfInactive(path, max_timestamp): + logger.debug('Skipping inactive path %s', path) + return + loader = self._loaders.get(path, None) + if loader is None: + loader = self._loader_factory(path) + self._loaders[path] = loader + logger.info('Loading data from path %s', path) + for timestamp, value in loader.Load(): + if max_timestamp is None or (timestamp is not None + and timestamp > max_timestamp): + max_timestamp = timestamp + yield value + if not self._MarkIfInactive(path, max_timestamp): + self._max_timestamps[path] = max_timestamp + + def _MarkIfInactive(self, path, max_timestamp): + """If max_timestamp is inactive, returns True and marks the path as such.""" + logger.debug('Checking active status of %s at %s', path, max_timestamp) + if max_timestamp is not None and not self._active_filter(max_timestamp): + self._max_timestamps[path] = _INACTIVE + del self._loaders[path] + return True + return False diff --git a/tensorboard/backend/event_processing/directory_loader_test.py b/tensorboard/backend/event_processing/directory_loader_test.py new file mode 100644 index 0000000000..792b421e3f --- /dev/null +++ b/tensorboard/backend/event_processing/directory_loader_test.py @@ -0,0 +1,224 @@ +# Copyright 2019 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +"""Tests for directory_loader.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import functools +import os +import shutil + +try: + # python version >= 3.3 + from unittest import mock # pylint: disable=g-import-not-at-top +except ImportError: + import mock # pylint: disable=g-import-not-at-top,unused-import + +import tensorflow as tf + +from tensorboard.backend.event_processing import directory_loader +from tensorboard.backend.event_processing import directory_watcher +from tensorboard.backend.event_processing import io_wrapper + + +class _TimestampedByteLoader(object): + """A loader that loads timestamped bytes from a file.""" + + def __init__(self, path, registry=None): + self._path = path + self._registry = registry if registry is not None else [] + self._registry.append(path) + self._f = open(path) + + def __del__(self): + self._registry.remove(self._path) + + def Load(self): + while True: + line = self._f.readline() + if not line: + return + ts, value = line.rstrip('\n').split(':') + yield float(ts), value + + +class DirectoryLoaderTest(tf.test.TestCase): + + def setUp(self): + # Put everything in a directory so it's easier to delete w/in tests. + self._directory = os.path.join(self.get_temp_dir(), 'testdir') + os.mkdir(self._directory) + self._loader = directory_loader.DirectoryLoader( + self._directory, _TimestampedByteLoader) + + def _WriteToFile(self, filename, data, timestamps=None): + if timestamps is None: + timestamps = range(len(data)) + self.assertEqual(len(data), len(timestamps)) + path = os.path.join(self._directory, filename) + with open(path, 'a') as f: + for byte, timestamp in zip(data, timestamps): + f.write('%f:%s\n' % (timestamp, byte)) + + def assertLoaderYields(self, values): + self.assertEqual(list(self._loader.Load()), values) + + def testRaisesWithBadArguments(self): + with self.assertRaises(ValueError): + directory_loader.DirectoryLoader(None, lambda x: None) + with self.assertRaises(ValueError): + directory_loader.DirectoryLoader('dir', None) + + def testEmptyDirectory(self): + self.assertLoaderYields([]) + + def testSingleFileLoading(self): + self._WriteToFile('a', 'abc') + self.assertLoaderYields(['a', 'b', 'c']) + self.assertLoaderYields([]) + self._WriteToFile('a', 'xyz') + self.assertLoaderYields(['x', 'y', 'z']) + self.assertLoaderYields([]) + + def testMultipleFileLoading(self): + self._WriteToFile('a', 'a') + self._WriteToFile('b', 'b') + self.assertLoaderYields(['a', 'b']) + self.assertLoaderYields([]) + self._WriteToFile('a', 'A') + self._WriteToFile('b', 'B') + self._WriteToFile('c', 'c') + # The loader should read new data from all the files. + self.assertLoaderYields(['A', 'B', 'c']) + self.assertLoaderYields([]) + + def testMultipleFileLoading_intermediateEmptyFiles(self): + self._WriteToFile('a', 'a') + self._WriteToFile('b', '') + self._WriteToFile('c', 'c') + self.assertLoaderYields(['a', 'c']) + + def testPathFilter(self): + self._loader = directory_loader.DirectoryLoader( + self._directory, _TimestampedByteLoader, + lambda path: 'tfevents' in path) + self._WriteToFile('skipped', 'a') + self._WriteToFile('event.out.tfevents.foo.bar', 'b') + self._WriteToFile('tf.event', 'c') + self.assertLoaderYields(['b']) + + def testActiveFilter_staticFilterBehavior(self): + """Tests behavior of a static active_filter.""" + loader_registry = [] + loader_factory = functools.partial( + _TimestampedByteLoader, registry=loader_registry) + active_filter = lambda timestamp: timestamp >= 2 + self._loader = directory_loader.DirectoryLoader( + self._directory, loader_factory, active_filter=active_filter) + def assertLoadersForPaths(paths): + paths = [os.path.join(self._directory, path) for path in paths] + self.assertEqual(loader_registry, paths) + # a: normal-looking file. + # b: file without sufficiently active data (should be marked inactive). + # c: file with timestamps in reverse order (max computed correctly). + # d: empty file (should be considered active in absence of timestamps). + self._WriteToFile('a', ['A1', 'A2'], [1, 2]) + self._WriteToFile('b', ['B1'], [1]) + self._WriteToFile('c', ['C2', 'C1', 'C0'], [2, 1, 0]) + self._WriteToFile('d', [], []) + self.assertLoaderYields(['A1', 'A2', 'B1', 'C2', 'C1', 'C0']) + assertLoadersForPaths(['a', 'c', 'd']) + self._WriteToFile('a', ['A3'], [3]) + self._WriteToFile('b', ['B3'], [3]) + self._WriteToFile('c', ['C0'], [0]) + self._WriteToFile('d', ['D3'], [3]) + self.assertLoaderYields(['A3', 'C0', 'D3']) + assertLoadersForPaths(['a', 'c', 'd']) + # Check that a 0 timestamp in file C on the most recent load doesn't + # override the max timestamp of 2 seen in the earlier load. + self._WriteToFile('c', ['C4'], [4]) + self.assertLoaderYields(['C4']) + assertLoadersForPaths(['a', 'c', 'd']) + + def testActiveFilter_dynamicFilterBehavior(self): + """Tests behavior of a dynamic active_filter.""" + loader_registry = [] + loader_factory = functools.partial( + _TimestampedByteLoader, registry=loader_registry) + threshold = 0 + active_filter = lambda timestamp: timestamp >= threshold + self._loader = directory_loader.DirectoryLoader( + self._directory, loader_factory, active_filter=active_filter) + def assertLoadersForPaths(paths): + paths = [os.path.join(self._directory, path) for path in paths] + self.assertEqual(loader_registry, paths) + self._WriteToFile('a', ['A1', 'A2'], [1, 2]) + self._WriteToFile('b', ['B1', 'B2', 'B3'], [1, 2, 3]) + self._WriteToFile('c', ['C1'], [1]) + threshold = 2 + # First load pass should leave file C marked inactive. + self.assertLoaderYields(['A1', 'A2', 'B1', 'B2', 'B3', 'C1']) + assertLoadersForPaths(['a', 'b']) + self._WriteToFile('a', ['A4'], [4]) + self._WriteToFile('b', ['B4'], [4]) + self._WriteToFile('c', ['C4'], [4]) + threshold = 3 + # Second load pass should mark file A as inactive (due to newly + # increased threshold) and thus skip reading data from it. + self.assertLoaderYields(['B4']) + assertLoadersForPaths(['b']) + self._WriteToFile('b', ['B5', 'B6'], [5, 6]) + # Simulate a third pass in which the threshold increases while + # we're processing a file, so it's still active at the start of the + # load but should be marked inactive at the end. + load_generator = self._loader.Load() + self.assertEqual('B5', next(load_generator)) + threshold = 7 + self.assertEqual(['B6'], list(load_generator)) + assertLoadersForPaths([]) + # Confirm that all loaders are now inactive. + self._WriteToFile('b', ['B7'], [7]) + self.assertLoaderYields([]) + + def testDoesntCrashWhenFileIsDeleted(self): + self._WriteToFile('a', 'a') + self.assertLoaderYields(['a']) + os.remove(os.path.join(self._directory, 'a')) + self._WriteToFile('b', 'b') + self.assertLoaderYields(['b']) + + def testRaisesDirectoryDeletedError_whenDirectoryIsDeleted(self): + self._WriteToFile('a', 'a') + self.assertLoaderYields(['a']) + shutil.rmtree(self._directory) + with self.assertRaises(directory_watcher.DirectoryDeletedError): + next(self._loader.Load()) + + def testDoesntRaiseDirectoryDeletedError_forUnrecognizedException(self): + self._WriteToFile('a', 'a') + self.assertLoaderYields(['a']) + class MyException(Exception): + pass + with mock.patch.object(io_wrapper, 'ListDirectoryAbsolute') as mock_listdir: + mock_listdir.side_effect = MyException + with self.assertRaises(MyException): + next(self._loader.Load()) + self.assertLoaderYields([]) + +if __name__ == '__main__': + tf.test.main() diff --git a/tensorboard/backend/event_processing/event_file_loader.py b/tensorboard/backend/event_processing/event_file_loader.py index c85f2bd664..50615e257c 100644 --- a/tensorboard/backend/event_processing/event_file_loader.py +++ b/tensorboard/backend/event_processing/event_file_loader.py @@ -93,3 +93,20 @@ def Load(self): """ for record in super(EventFileLoader, self).Load(): yield event_pb2.Event.FromString(record) + + +class TimestampedEventFileLoader(EventFileLoader): + """An iterator that yields (UNIX timestamp float, Event proto) pairs.""" + + def Load(self): + """Loads all new events and their wall time values from disk. + + Calling Load multiple times in a row will not 'drop' events as long as the + return value is not iterated over. + + Yields: + Pairs of (UNIX timestamp float, Event proto) for all events in the file + that have not been yielded yet. + """ + for event in super(TimestampedEventFileLoader, self).Load(): + yield (event.wall_time, event) From c2e00a2ed8569ccb46865fbb3415705bb03d06fa Mon Sep 17 00:00:00 2001 From: Nick Felt Date: Tue, 19 Feb 2019 12:48:41 -0800 Subject: [PATCH 02/11] plumb option to use DirectoryLoader through event processing classes --- tensorboard/backend/event_processing/BUILD | 3 ++ .../plugin_event_accumulator.py | 17 ++++++-- .../plugin_event_accumulator_test.py | 4 +- .../plugin_event_multiplexer.py | 10 ++++- .../plugin_event_multiplexer_test.py | 39 +++++++++++++++++-- 5 files changed, 64 insertions(+), 9 deletions(-) diff --git a/tensorboard/backend/event_processing/BUILD b/tensorboard/backend/event_processing/BUILD index e33566aca9..e93a2eade6 100644 --- a/tensorboard/backend/event_processing/BUILD +++ b/tensorboard/backend/event_processing/BUILD @@ -126,6 +126,7 @@ py_library( srcs_version = "PY2AND3", visibility = ["//visibility:public"], deps = [ + ":directory_loader", ":directory_watcher", ":event_file_loader", ":io_wrapper", @@ -215,6 +216,8 @@ py_test( ":event_accumulator", ":event_multiplexer", "//tensorboard:expect_tensorflow_installed", + "//tensorboard/compat/proto:protos_all_py_pb2", + "//tensorboard/util:test_util", ], ) diff --git a/tensorboard/backend/event_processing/plugin_event_accumulator.py b/tensorboard/backend/event_processing/plugin_event_accumulator.py index 08c5461dad..2615ae07bf 100644 --- a/tensorboard/backend/event_processing/plugin_event_accumulator.py +++ b/tensorboard/backend/event_processing/plugin_event_accumulator.py @@ -23,6 +23,7 @@ import six from tensorboard import data_compat +from tensorboard.backend.event_processing import directory_loader from tensorboard.backend.event_processing import directory_watcher from tensorboard.backend.event_processing import event_file_loader from tensorboard.backend.event_processing import io_wrapper @@ -105,7 +106,8 @@ def __init__(self, path, size_guidance=None, tensor_size_guidance=None, - purge_orphaned_data=True): + purge_orphaned_data=True, + eventfile_active_filter=None): """Construct the `EventAccumulator`. Args: @@ -125,6 +127,9 @@ def __init__(self, `size_guidance[event_accumulator.TENSORS]`. Defaults to `{}`. purge_orphaned_data: Whether to discard any events that were "orphaned" by a TensorFlow restart. + eventfile_active_filter: Optional predicate for determining whether an + eventfile latest load timestamp should be considered active. If passed, + this will enable multifile directory loading. """ size_guidance = dict(size_guidance or DEFAULT_SIZE_GUIDANCE) sizes = {} @@ -156,7 +161,7 @@ def __init__(self, self._plugin_tag_locks = collections.defaultdict(threading.Lock) self.path = path - self._generator = _GeneratorFromPath(path) + self._generator = _GeneratorFromPath(path, eventfile_active_filter) self._generator_mutex = threading.Lock() self.purge_orphaned_data = purge_orphaned_data @@ -568,12 +573,18 @@ def _GetPurgeMessage(most_recent_step, most_recent_wall_time, event_step, event_step, event_wall_time) -def _GeneratorFromPath(path): +def _GeneratorFromPath(path, eventfile_active_filter=None): """Create an event generator for file or directory at given path string.""" if not path: raise ValueError('path must be a valid string') if io_wrapper.IsTensorFlowEventsFile(path): return event_file_loader.EventFileLoader(path) + elif eventfile_active_filter: + return directory_loader.DirectoryLoader( + path, + event_file_loader.TimestampedEventFileLoader, + path_filter=io_wrapper.IsTensorFlowEventsFile, + active_filter=eventfile_active_filter) else: return directory_watcher.DirectoryWatcher( path, diff --git a/tensorboard/backend/event_processing/plugin_event_accumulator_test.py b/tensorboard/backend/event_processing/plugin_event_accumulator_test.py index 6a3f2434aa..ce07e3d631 100644 --- a/tensorboard/backend/event_processing/plugin_event_accumulator_test.py +++ b/tensorboard/backend/event_processing/plugin_event_accumulator_test.py @@ -131,7 +131,9 @@ def setUp(self): self._real_generator = ea._GeneratorFromPath def _FakeAccumulatorConstructor(generator, *args, **kwargs): - ea._GeneratorFromPath = lambda x: generator + def _FakeGeneratorFromPath(path, eventfile_active_filter=None): + return generator + ea._GeneratorFromPath = _FakeGeneratorFromPath return self._real_constructor(generator, *args, **kwargs) ea.EventAccumulator = _FakeAccumulatorConstructor diff --git a/tensorboard/backend/event_processing/plugin_event_multiplexer.py b/tensorboard/backend/event_processing/plugin_event_multiplexer.py index 126d1622d7..3765f94f98 100644 --- a/tensorboard/backend/event_processing/plugin_event_multiplexer.py +++ b/tensorboard/backend/event_processing/plugin_event_multiplexer.py @@ -75,7 +75,8 @@ def __init__(self, size_guidance=None, tensor_size_guidance=None, purge_orphaned_data=True, - max_reload_threads=None): + max_reload_threads=None, + eventfile_active_filter=None): """Constructor for the `EventMultiplexer`. Args: @@ -93,6 +94,9 @@ def __init__(self, max_reload_threads: The max number of threads that TensorBoard can use to reload runs. Each thread reloads one run at a time. If not provided, reloads runs serially (one after another). + eventfile_active_filter: Optional predicate for determining whether an + eventfile latest load timestamp should be considered active. If passed, + this will enable multifile directory loading. """ logger.info('Event Multiplexer initializing.') self._accumulators_mutex = threading.Lock() @@ -104,6 +108,7 @@ def __init__(self, self._tensor_size_guidance = tensor_size_guidance self.purge_orphaned_data = purge_orphaned_data self._max_reload_threads = max_reload_threads or 1 + self._eventfile_active_filter = eventfile_active_filter if run_path_map is not None: logger.info('Event Multplexer doing initialization load for %s', run_path_map) @@ -144,7 +149,8 @@ def AddRun(self, path, name=None): path, size_guidance=self._size_guidance, tensor_size_guidance=self._tensor_size_guidance, - purge_orphaned_data=self.purge_orphaned_data) + purge_orphaned_data=self.purge_orphaned_data, + eventfile_active_filter=self._eventfile_active_filter) self._accumulators[name] = accumulator self._paths[name] = path if accumulator: diff --git a/tensorboard/backend/event_processing/plugin_event_multiplexer_test.py b/tensorboard/backend/event_processing/plugin_event_multiplexer_test.py index 600a291604..10ff38a946 100644 --- a/tensorboard/backend/event_processing/plugin_event_multiplexer_test.py +++ b/tensorboard/backend/event_processing/plugin_event_multiplexer_test.py @@ -25,6 +25,8 @@ from tensorboard.backend.event_processing import plugin_event_accumulator as event_accumulator # pylint: disable=line-too-long from tensorboard.backend.event_processing import plugin_event_multiplexer as event_multiplexer # pylint: disable=line-too-long +from tensorboard.compat.proto import summary_pb2 +from tensorboard.util import test_util def _AddEvents(path): @@ -89,8 +91,10 @@ def Reload(self): def _GetFakeAccumulator(path, size_guidance=None, tensor_size_guidance=None, - purge_orphaned_data=None): + purge_orphaned_data=None, + eventfile_active_filter=None): del size_guidance, tensor_size_guidance, purge_orphaned_data # Unused. + del eventfile_active_filter # unused return _FakeAccumulator(path) @@ -366,10 +370,39 @@ def testReloadWith1Thread(self): class EventMultiplexerWithRealAccumulatorTest(tf.test.TestCase): + def testMultifileReload(self): + multiplexer = event_multiplexer.EventMultiplexer( + eventfile_active_filter=lambda timestamp: True) + def make_summary(tag_name): + return summary_pb2.Summary( + value=[summary_pb2.Summary.Value(tag=tag_name, simple_value=1.0)]) + logdir = self.get_temp_dir() + run_name = 'run1' + run_path = os.path.join(logdir, run_name) + # Create two separate event files, using filename suffix to ensure a + # deterministic sort order, and then simulate a write to file A, then + # to file B, then another write to file A (with reloads after each). + with test_util.FileWriter(run_path, filename_suffix='.a') as writer_a: + writer_a.add_summary(make_summary('a1'), 1) + writer_a.flush() + multiplexer.AddRunsFromDirectory(logdir) + multiplexer.Reload() + with test_util.FileWriter(run_path, filename_suffix='.b') as writer_b: + writer_b.add_summary(make_summary('b'), 1) + multiplexer.Reload() + writer_a.add_summary(make_summary('a2'), 2) + writer_a.flush() + multiplexer.Reload() + # Both event files should be treated as active, so we should load the newly + # written data to the first file even though it's no longer the latest one. + self.assertEqual(1, len(multiplexer.Tensors(run_name, 'a1'))) + self.assertEqual(1, len(multiplexer.Tensors(run_name, 'b'))) + self.assertEqual(1, len(multiplexer.Tensors(run_name, 'a2'))) + def testDeletingDirectoryRemovesRun(self): x = event_multiplexer.EventMultiplexer() tmpdir = self.get_temp_dir() - self.add3RunsToMultiplexer(tmpdir, x) + self._add3RunsToMultiplexer(tmpdir, x) x.Reload() # Delete the directory, then reload. @@ -377,7 +410,7 @@ def testDeletingDirectoryRemovesRun(self): x.Reload() self.assertNotIn('run2', x.Runs().keys()) - def add3RunsToMultiplexer(self, logdir, multiplexer): + def _add3RunsToMultiplexer(self, logdir, multiplexer): """Creates and adds 3 runs to the multiplexer.""" run1_dir = os.path.join(logdir, 'run1') run2_dir = os.path.join(logdir, 'run2') From ebbee75dac5ba374f3bee1001d1f3ecefdb4ab5f Mon Sep 17 00:00:00 2001 From: Nick Felt Date: Tue, 19 Feb 2019 12:49:15 -0800 Subject: [PATCH 03/11] add --reload_multifile{,_inactive_secs} flags to enable/configure multi-file directory loading behavior --- tensorboard/backend/application.py | 20 ++++++++- tensorboard/backend/application_test.py | 39 +++++++++++++++++- tensorboard/plugins/core/core_plugin.py | 55 ++++++++++++++++++++----- 3 files changed, 101 insertions(+), 13 deletions(-) diff --git a/tensorboard/backend/application.py b/tensorboard/backend/application.py index d32c5e22c9..fa96608022 100644 --- a/tensorboard/backend/application.py +++ b/tensorboard/backend/application.py @@ -106,11 +106,13 @@ def standard_tensorboard_wsgi(flags, plugin_loaders, assets_zip_provider): :type plugin_loaders: list[base_plugin.TBLoader] :rtype: TensorBoardWSGI """ + eventfile_active_filter = _get_eventfile_active_filter(flags) multiplexer = event_multiplexer.EventMultiplexer( size_guidance=DEFAULT_SIZE_GUIDANCE, tensor_size_guidance=tensor_size_guidance_from_flags(flags), purge_orphaned_data=flags.purge_orphaned_data, - max_reload_threads=flags.max_reload_threads) + max_reload_threads=flags.max_reload_threads, + eventfile_active_filter=eventfile_active_filter) loading_multiplexer = multiplexer reload_interval = flags.reload_interval # For db import op mode, prefer reloading in a child process. See @@ -530,3 +532,19 @@ def _clean_path(path, path_prefix=""): if path != path_prefix + '/' and path.endswith('/'): return path[:-1] return path + + +def _get_eventfile_active_filter(flags): + """Returns a predicate for whether an eventfile load timestamp is active. + + Returns: + A predicate function accepting a single UNIX timestamp float argument. + """ + if not flags.reload_multifile: + return None + inactive_secs = flags.reload_multifile_inactive_secs + if inactive_secs == 0: + return None + if inactive_secs < 0: + return lambda timestamp: True + return lambda timestamp: timestamp + inactive_secs >= time.time() diff --git a/tensorboard/backend/application_test.py b/tensorboard/backend/application_test.py index a96ce478b9..29a0d0b163 100644 --- a/tensorboard/backend/application_test.py +++ b/tensorboard/backend/application_test.py @@ -27,6 +27,7 @@ import shutil import socket import tempfile +import time import six @@ -58,7 +59,9 @@ def __init__( db_import=False, db_import_use_op=False, window_title='', - path_prefix=''): + path_prefix='', + reload_multifile=False, + reload_multifile_inactive_secs=4000): self.logdir = logdir self.purge_orphaned_data = purge_orphaned_data self.reload_interval = reload_interval @@ -70,6 +73,8 @@ def __init__( self.db_import_use_op = db_import_use_op self.window_title = window_title self.path_prefix = path_prefix + self.reload_multifile = reload_multifile + self.reload_multifile_inactive_secs = reload_multifile_inactive_secs class FakePlugin(base_plugin.TBPlugin): @@ -366,6 +371,38 @@ def testSlashlessRoute(self): self._test('runaway', False) +class GetEventfileActiveFilterTest(tb_test.TestCase): + + def testDisabled(self): + flags = FakeFlags('logdir', reload_multifile=False) + self.assertIsNone(application._get_eventfile_active_filter(flags)) + + def testInactiveSecsZero(self): + flags = FakeFlags('logdir', reload_multifile=True, + reload_multifile_inactive_secs=0) + self.assertIsNone(application._get_eventfile_active_filter(flags)) + + def testInactiveSecsNegative(self): + flags = FakeFlags('logdir', reload_multifile=True, + reload_multifile_inactive_secs=-1) + filter = application._get_eventfile_active_filter(flags) + self.assertTrue(filter(0)) + self.assertTrue(filter(time.time())) + self.assertTrue(filter(float("inf"))) + + def testInactiveSecs(self): + flags = FakeFlags('logdir', reload_multifile=True, + reload_multifile_inactive_secs=10) + filter = application._get_eventfile_active_filter(flags) + with mock.patch.object(time, 'time') as mock_time: + mock_time.return_value = 100 + self.assertFalse(filter(0)) + self.assertFalse(filter(time.time() - 11)) + self.assertTrue(filter(time.time() - 10)) + self.assertTrue(filter(time.time())) + self.assertTrue(filter(float("inf"))) + + class ParseEventFilesSpecTest(tb_test.TestCase): def assertPlatformSpecificLogdirParsing(self, pathObj, logdir, expected): diff --git a/tensorboard/plugins/core/core_plugin.py b/tensorboard/plugins/core/core_plugin.py index 9433969505..dbdf627c83 100644 --- a/tensorboard/plugins/core/core_plugin.py +++ b/tensorboard/plugins/core/core_plugin.py @@ -319,17 +319,6 @@ def define_flags(self, parser): Whether to purge data that may have been orphaned due to TensorBoard restarts. Setting --purge_orphaned_data=False can be used to debug data disappearance. (default: %(default)s)\ -''') - - parser.add_argument( - '--reload_interval', - metavar='SECONDS', - type=float, - default=5.0, - help='''\ -How often the backend should load more data, in seconds. Set to 0 to -load just once at startup and a negative number to never reload at all. -Not relevant for DB read-only mode. (default: %(default)s)\ ''') parser.add_argument( @@ -433,6 +422,17 @@ def define_flags(self, parser): The max number of threads that TensorBoard can use to reload runs. Not relevant for db read-only mode. Each thread reloads one run at a time. (default: %(default)s)\ +''') + + parser.add_argument( + '--reload_interval', + metavar='SECONDS', + type=float, + default=5.0, + help='''\ +How often the backend should load more data, in seconds. Set to 0 to +load just once at startup and a negative number to never reload at all. +Not relevant for DB read-only mode. (default: %(default)s)\ ''') parser.add_argument( @@ -447,6 +447,39 @@ def define_flags(self, parser): and a child process for DB import reloading. The "process" option is only useful with DB import mode. The "blocking" option will block startup until reload finishes, and requires --load_interval=0. (default: %(default)s)\ +''') + + parser.add_argument( + '--reload_multifile', + metavar='BOOL', + # Custom str-to-bool converter since regular bool() doesn't work. + type=lambda v: {'true': True, 'false': False}.get(v.lower(), v), + choices=[True, False], + default=False, + help='''\ +[experimental] If true, this enables experimental support for continuously +polling multiple event files in each run directory for newly appended data +(rather than only polling the last event file). Event files will only be +polled as long as their most recently read data is newer than the threshold +defined by --reload_multifile_inactive_secs, to limit resource usage. Beware +of running out of memory if the logdir contains many active eventfiles. +(default: %(default)s)\ +''') + + parser.add_argument( + '--reload_multifile_inactive_secs', + metavar='SECONDS', + type=int, + default=4000, + help='''\ +[experimental] Configures the age threshold in seconds at which an eventfile +that has no event wall time more recent than that will be considered an +inactive file and no longer polled (to limit resource usage). If set to -1, +no maximum age will be enforced, but beware of running out of memory and +heavier filesystem read traffic. If set to 0, this reverts to the older +last-file-only polling strategy (akin to --reload_multifile=false). +(default: %(default)s - intended to ensure an eventfile remains active if +it receives new data at least once per hour)\ ''') parser.add_argument( From 29d0504e109f06e7b853f14b37e94bf375374867 Mon Sep 17 00:00:00 2001 From: Nick Felt Date: Tue, 19 Feb 2019 13:09:59 -0800 Subject: [PATCH 04/11] update README.md FAQ to mention flag as an experimental option --- README.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/README.md b/README.md index 487626e4a1..248e125964 100644 --- a/README.md +++ b/README.md @@ -249,6 +249,11 @@ tensorboard in inspect mode to inspect the contents of your event files. ### TensorBoard is showing only some of my data, or isn't properly updating! +> **Update:** the [experimental `--reload_multifile=true` option][pr-1867] can +> now be used to poll multiple files per directory for new data, rather than +> just the most recent one as described below. You may need to install our +> nightly build [`tb-nightly`][tb-nightly] for this option to be available. + This issue usually comes about because of how TensorBoard iterates through the `tfevents` files: it progresses through the events file in timestamp order, and only reads one file at a time. Let's suppose we have files with timestamps `a` @@ -260,6 +265,11 @@ multiple summary writers, each one should be writing to a separate directory. ### Does TensorBoard support multiple or distributed summary writers? +> **Update:** the [experimental `--reload_multifile=true` option][pr-1867] can +> now be used to poll multiple files per directory for new data. You may need +> to install our nightly build [`tb-nightly`][tb-nightly] for this option to +> be available. + No. TensorBoard expects that only one events file will be written to at a time, and multiple summary writers means multiple events files. If you are running a distributed TensorFlow instance, we encourage you to designate a single worker @@ -275,6 +285,11 @@ with itself, there are a few possible explanations. * You may have multiple execution of TensorFlow that all wrote to the same log directory. Please have each TensorFlow run write to its own logdir. + > **Update:** the [experimental `--reload_multifile=true` option][pr-1867] can + > now be used to poll multiple files per directory for new data. You may need + > to install our nightly build [`tb-nightly`][tb-nightly] for this option to + > be available. + * You may have a bug in your code where the global_step variable (passed to `FileWriter.add_summary`) is being maintained incorrectly. @@ -372,3 +387,5 @@ information as you can provide (e.g. attaching events files, including the outpu of `tensorboard --inspect`, etc.). [stack-overflow]: https://stackoverflow.com/questions/tagged/tensorboard +[pr-1867]: https://github.com/tensorflow/tensorboard/pull/1867 +[tb-nightly]: https://pypi.org/project/tb-nightly/ From c494ff3c9c3eb43a3b7b04856f5950a9c305f68a Mon Sep 17 00:00:00 2001 From: Nick Felt Date: Thu, 25 Jul 2019 17:20:06 -0700 Subject: [PATCH 05/11] CR: rename eventfile->event file, rename filter->filter_fn --- tensorboard/backend/application.py | 8 +++--- tensorboard/backend/application_test.py | 26 +++++++++---------- .../plugin_event_accumulator.py | 14 +++++----- .../plugin_event_accumulator_test.py | 2 +- .../plugin_event_multiplexer.py | 10 +++---- .../plugin_event_multiplexer_test.py | 6 ++--- tensorboard/plugins/core/core_plugin.py | 6 ++--- 7 files changed, 36 insertions(+), 36 deletions(-) diff --git a/tensorboard/backend/application.py b/tensorboard/backend/application.py index fa96608022..6128aaedb0 100644 --- a/tensorboard/backend/application.py +++ b/tensorboard/backend/application.py @@ -106,13 +106,13 @@ def standard_tensorboard_wsgi(flags, plugin_loaders, assets_zip_provider): :type plugin_loaders: list[base_plugin.TBLoader] :rtype: TensorBoardWSGI """ - eventfile_active_filter = _get_eventfile_active_filter(flags) + event_file_active_filter = _get_event_file_active_filter(flags) multiplexer = event_multiplexer.EventMultiplexer( size_guidance=DEFAULT_SIZE_GUIDANCE, tensor_size_guidance=tensor_size_guidance_from_flags(flags), purge_orphaned_data=flags.purge_orphaned_data, max_reload_threads=flags.max_reload_threads, - eventfile_active_filter=eventfile_active_filter) + event_file_active_filter=event_file_active_filter) loading_multiplexer = multiplexer reload_interval = flags.reload_interval # For db import op mode, prefer reloading in a child process. See @@ -534,8 +534,8 @@ def _clean_path(path, path_prefix=""): return path -def _get_eventfile_active_filter(flags): - """Returns a predicate for whether an eventfile load timestamp is active. +def _get_event_file_active_filter(flags): + """Returns a predicate for whether an event file load timestamp is active. Returns: A predicate function accepting a single UNIX timestamp float argument. diff --git a/tensorboard/backend/application_test.py b/tensorboard/backend/application_test.py index 29a0d0b163..e5944421ea 100644 --- a/tensorboard/backend/application_test.py +++ b/tensorboard/backend/application_test.py @@ -371,36 +371,36 @@ def testSlashlessRoute(self): self._test('runaway', False) -class GetEventfileActiveFilterTest(tb_test.TestCase): +class GetEventFileActiveFilterTest(tb_test.TestCase): def testDisabled(self): flags = FakeFlags('logdir', reload_multifile=False) - self.assertIsNone(application._get_eventfile_active_filter(flags)) + self.assertIsNone(application._get_event_file_active_filter(flags)) def testInactiveSecsZero(self): flags = FakeFlags('logdir', reload_multifile=True, reload_multifile_inactive_secs=0) - self.assertIsNone(application._get_eventfile_active_filter(flags)) + self.assertIsNone(application._get_event_file_active_filter(flags)) def testInactiveSecsNegative(self): flags = FakeFlags('logdir', reload_multifile=True, reload_multifile_inactive_secs=-1) - filter = application._get_eventfile_active_filter(flags) - self.assertTrue(filter(0)) - self.assertTrue(filter(time.time())) - self.assertTrue(filter(float("inf"))) + filter_fn = application._get_event_file_active_filter(flags) + self.assertTrue(filter_fn(0)) + self.assertTrue(filter_fn(time.time())) + self.assertTrue(filter_fn(float("inf"))) def testInactiveSecs(self): flags = FakeFlags('logdir', reload_multifile=True, reload_multifile_inactive_secs=10) - filter = application._get_eventfile_active_filter(flags) + filter_fn = application._get_event_file_active_filter(flags) with mock.patch.object(time, 'time') as mock_time: mock_time.return_value = 100 - self.assertFalse(filter(0)) - self.assertFalse(filter(time.time() - 11)) - self.assertTrue(filter(time.time() - 10)) - self.assertTrue(filter(time.time())) - self.assertTrue(filter(float("inf"))) + self.assertFalse(filter_fn(0)) + self.assertFalse(filter_fn(time.time() - 11)) + self.assertTrue(filter_fn(time.time() - 10)) + self.assertTrue(filter_fn(time.time())) + self.assertTrue(filter_fn(float("inf"))) class ParseEventFilesSpecTest(tb_test.TestCase): diff --git a/tensorboard/backend/event_processing/plugin_event_accumulator.py b/tensorboard/backend/event_processing/plugin_event_accumulator.py index 2615ae07bf..27400a21ed 100644 --- a/tensorboard/backend/event_processing/plugin_event_accumulator.py +++ b/tensorboard/backend/event_processing/plugin_event_accumulator.py @@ -107,7 +107,7 @@ def __init__(self, size_guidance=None, tensor_size_guidance=None, purge_orphaned_data=True, - eventfile_active_filter=None): + event_file_active_filter=None): """Construct the `EventAccumulator`. Args: @@ -127,8 +127,8 @@ def __init__(self, `size_guidance[event_accumulator.TENSORS]`. Defaults to `{}`. purge_orphaned_data: Whether to discard any events that were "orphaned" by a TensorFlow restart. - eventfile_active_filter: Optional predicate for determining whether an - eventfile latest load timestamp should be considered active. If passed, + event_file_active_filter: Optional predicate for determining whether an + event file latest load timestamp should be considered active. If passed, this will enable multifile directory loading. """ size_guidance = dict(size_guidance or DEFAULT_SIZE_GUIDANCE) @@ -161,7 +161,7 @@ def __init__(self, self._plugin_tag_locks = collections.defaultdict(threading.Lock) self.path = path - self._generator = _GeneratorFromPath(path, eventfile_active_filter) + self._generator = _GeneratorFromPath(path, event_file_active_filter) self._generator_mutex = threading.Lock() self.purge_orphaned_data = purge_orphaned_data @@ -573,18 +573,18 @@ def _GetPurgeMessage(most_recent_step, most_recent_wall_time, event_step, event_step, event_wall_time) -def _GeneratorFromPath(path, eventfile_active_filter=None): +def _GeneratorFromPath(path, event_file_active_filter=None): """Create an event generator for file or directory at given path string.""" if not path: raise ValueError('path must be a valid string') if io_wrapper.IsTensorFlowEventsFile(path): return event_file_loader.EventFileLoader(path) - elif eventfile_active_filter: + elif event_file_active_filter: return directory_loader.DirectoryLoader( path, event_file_loader.TimestampedEventFileLoader, path_filter=io_wrapper.IsTensorFlowEventsFile, - active_filter=eventfile_active_filter) + active_filter=event_file_active_filter) else: return directory_watcher.DirectoryWatcher( path, diff --git a/tensorboard/backend/event_processing/plugin_event_accumulator_test.py b/tensorboard/backend/event_processing/plugin_event_accumulator_test.py index ce07e3d631..4961fccf1a 100644 --- a/tensorboard/backend/event_processing/plugin_event_accumulator_test.py +++ b/tensorboard/backend/event_processing/plugin_event_accumulator_test.py @@ -131,7 +131,7 @@ def setUp(self): self._real_generator = ea._GeneratorFromPath def _FakeAccumulatorConstructor(generator, *args, **kwargs): - def _FakeGeneratorFromPath(path, eventfile_active_filter=None): + def _FakeGeneratorFromPath(path, event_file_active_filter=None): return generator ea._GeneratorFromPath = _FakeGeneratorFromPath return self._real_constructor(generator, *args, **kwargs) diff --git a/tensorboard/backend/event_processing/plugin_event_multiplexer.py b/tensorboard/backend/event_processing/plugin_event_multiplexer.py index 3765f94f98..d147a6272d 100644 --- a/tensorboard/backend/event_processing/plugin_event_multiplexer.py +++ b/tensorboard/backend/event_processing/plugin_event_multiplexer.py @@ -76,7 +76,7 @@ def __init__(self, tensor_size_guidance=None, purge_orphaned_data=True, max_reload_threads=None, - eventfile_active_filter=None): + event_file_active_filter=None): """Constructor for the `EventMultiplexer`. Args: @@ -94,8 +94,8 @@ def __init__(self, max_reload_threads: The max number of threads that TensorBoard can use to reload runs. Each thread reloads one run at a time. If not provided, reloads runs serially (one after another). - eventfile_active_filter: Optional predicate for determining whether an - eventfile latest load timestamp should be considered active. If passed, + event_file_active_filter: Optional predicate for determining whether an + event file latest load timestamp should be considered active. If passed, this will enable multifile directory loading. """ logger.info('Event Multiplexer initializing.') @@ -108,7 +108,7 @@ def __init__(self, self._tensor_size_guidance = tensor_size_guidance self.purge_orphaned_data = purge_orphaned_data self._max_reload_threads = max_reload_threads or 1 - self._eventfile_active_filter = eventfile_active_filter + self._event_file_active_filter = event_file_active_filter if run_path_map is not None: logger.info('Event Multplexer doing initialization load for %s', run_path_map) @@ -150,7 +150,7 @@ def AddRun(self, path, name=None): size_guidance=self._size_guidance, tensor_size_guidance=self._tensor_size_guidance, purge_orphaned_data=self.purge_orphaned_data, - eventfile_active_filter=self._eventfile_active_filter) + event_file_active_filter=self._event_file_active_filter) self._accumulators[name] = accumulator self._paths[name] = path if accumulator: diff --git a/tensorboard/backend/event_processing/plugin_event_multiplexer_test.py b/tensorboard/backend/event_processing/plugin_event_multiplexer_test.py index 10ff38a946..31ad7b1ffe 100644 --- a/tensorboard/backend/event_processing/plugin_event_multiplexer_test.py +++ b/tensorboard/backend/event_processing/plugin_event_multiplexer_test.py @@ -92,9 +92,9 @@ def _GetFakeAccumulator(path, size_guidance=None, tensor_size_guidance=None, purge_orphaned_data=None, - eventfile_active_filter=None): + event_file_active_filter=None): del size_guidance, tensor_size_guidance, purge_orphaned_data # Unused. - del eventfile_active_filter # unused + del event_file_active_filter # unused return _FakeAccumulator(path) @@ -372,7 +372,7 @@ class EventMultiplexerWithRealAccumulatorTest(tf.test.TestCase): def testMultifileReload(self): multiplexer = event_multiplexer.EventMultiplexer( - eventfile_active_filter=lambda timestamp: True) + event_file_active_filter=lambda timestamp: True) def make_summary(tag_name): return summary_pb2.Summary( value=[summary_pb2.Summary.Value(tag=tag_name, simple_value=1.0)]) diff --git a/tensorboard/plugins/core/core_plugin.py b/tensorboard/plugins/core/core_plugin.py index dbdf627c83..102989be07 100644 --- a/tensorboard/plugins/core/core_plugin.py +++ b/tensorboard/plugins/core/core_plugin.py @@ -462,7 +462,7 @@ def define_flags(self, parser): (rather than only polling the last event file). Event files will only be polled as long as their most recently read data is newer than the threshold defined by --reload_multifile_inactive_secs, to limit resource usage. Beware -of running out of memory if the logdir contains many active eventfiles. +of running out of memory if the logdir contains many active event files. (default: %(default)s)\ ''') @@ -472,13 +472,13 @@ def define_flags(self, parser): type=int, default=4000, help='''\ -[experimental] Configures the age threshold in seconds at which an eventfile +[experimental] Configures the age threshold in seconds at which an event file that has no event wall time more recent than that will be considered an inactive file and no longer polled (to limit resource usage). If set to -1, no maximum age will be enforced, but beware of running out of memory and heavier filesystem read traffic. If set to 0, this reverts to the older last-file-only polling strategy (akin to --reload_multifile=false). -(default: %(default)s - intended to ensure an eventfile remains active if +(default: %(default)s - intended to ensure an event file remains active if it receives new data at least once per hour)\ ''') From 1be64683b04433d70aa77a8d9b5e7f7af88acb23 Mon Sep 17 00:00:00 2001 From: Nick Felt Date: Thu, 25 Jul 2019 17:21:06 -0700 Subject: [PATCH 06/11] CR: clarify return value of _get_event_file_active_filter() --- tensorboard/backend/application.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tensorboard/backend/application.py b/tensorboard/backend/application.py index 6128aaedb0..bab3066848 100644 --- a/tensorboard/backend/application.py +++ b/tensorboard/backend/application.py @@ -538,7 +538,8 @@ def _get_event_file_active_filter(flags): """Returns a predicate for whether an event file load timestamp is active. Returns: - A predicate function accepting a single UNIX timestamp float argument. + A predicate function accepting a single UNIX timestamp float argument, or + None if multi-file loading is not enabled. """ if not flags.reload_multifile: return None From e792ad8cb4bc46c3db08d56578836800a2d1b402 Mon Sep 17 00:00:00 2001 From: Nick Felt Date: Fri, 26 Jul 2019 17:14:12 -0700 Subject: [PATCH 07/11] CR: make test_util.FileWriter usable from eager mode --- tensorboard/util/test_util.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tensorboard/util/test_util.py b/tensorboard/util/test_util.py index 2c5fd4eeea..69e94b5021 100644 --- a/tensorboard/util/test_util.py +++ b/tensorboard/util/test_util.py @@ -46,6 +46,11 @@ class FileWriter(tf.compat.v1.summary.FileWriter): for testing in integrational style (writing out event files and use the real event readers). """ + def __init__(self, *args, **kwargs): + # Briefly enter graph mode context so this testing FileWriter can be + # created from an eager mode context without triggering a usage error. + with tf.compat.v1.Graph().as_default(): + super(FileWriter, self).__init__(*args, **kwargs) def add_event(self, event): if isinstance(event, event_pb2.Event): From 7aa18e734ac7ad40c36fd5504e98150a2bfe73d3 Mon Sep 17 00:00:00 2001 From: Nick Felt Date: Fri, 26 Jul 2019 17:14:51 -0700 Subject: [PATCH 08/11] CR: introduce test_util.FileWriter.add_test_summary() shortcut --- tensorboard/backend/event_processing/BUILD | 1 - .../event_processing/plugin_event_multiplexer_test.py | 10 +++------- tensorboard/util/test_util.py | 6 ++++++ 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/tensorboard/backend/event_processing/BUILD b/tensorboard/backend/event_processing/BUILD index e93a2eade6..5458148788 100644 --- a/tensorboard/backend/event_processing/BUILD +++ b/tensorboard/backend/event_processing/BUILD @@ -216,7 +216,6 @@ py_test( ":event_accumulator", ":event_multiplexer", "//tensorboard:expect_tensorflow_installed", - "//tensorboard/compat/proto:protos_all_py_pb2", "//tensorboard/util:test_util", ], ) diff --git a/tensorboard/backend/event_processing/plugin_event_multiplexer_test.py b/tensorboard/backend/event_processing/plugin_event_multiplexer_test.py index 31ad7b1ffe..2b81b32ade 100644 --- a/tensorboard/backend/event_processing/plugin_event_multiplexer_test.py +++ b/tensorboard/backend/event_processing/plugin_event_multiplexer_test.py @@ -25,7 +25,6 @@ from tensorboard.backend.event_processing import plugin_event_accumulator as event_accumulator # pylint: disable=line-too-long from tensorboard.backend.event_processing import plugin_event_multiplexer as event_multiplexer # pylint: disable=line-too-long -from tensorboard.compat.proto import summary_pb2 from tensorboard.util import test_util @@ -373,9 +372,6 @@ class EventMultiplexerWithRealAccumulatorTest(tf.test.TestCase): def testMultifileReload(self): multiplexer = event_multiplexer.EventMultiplexer( event_file_active_filter=lambda timestamp: True) - def make_summary(tag_name): - return summary_pb2.Summary( - value=[summary_pb2.Summary.Value(tag=tag_name, simple_value=1.0)]) logdir = self.get_temp_dir() run_name = 'run1' run_path = os.path.join(logdir, run_name) @@ -383,14 +379,14 @@ def make_summary(tag_name): # deterministic sort order, and then simulate a write to file A, then # to file B, then another write to file A (with reloads after each). with test_util.FileWriter(run_path, filename_suffix='.a') as writer_a: - writer_a.add_summary(make_summary('a1'), 1) + writer_a.add_test_summary('a1', step=1) writer_a.flush() multiplexer.AddRunsFromDirectory(logdir) multiplexer.Reload() with test_util.FileWriter(run_path, filename_suffix='.b') as writer_b: - writer_b.add_summary(make_summary('b'), 1) + writer_b.add_test_summary('b', step=1) multiplexer.Reload() - writer_a.add_summary(make_summary('a2'), 2) + writer_a.add_test_summary('a2', step=2) writer_a.flush() multiplexer.Reload() # Both event files should be treated as active, so we should load the newly diff --git a/tensorboard/util/test_util.py b/tensorboard/util/test_util.py index 69e94b5021..c970dcf472 100644 --- a/tensorboard/util/test_util.py +++ b/tensorboard/util/test_util.py @@ -52,6 +52,12 @@ def __init__(self, *args, **kwargs): with tf.compat.v1.Graph().as_default(): super(FileWriter, self).__init__(*args, **kwargs) + def add_test_summary(self, tag, simple_value=1.0, step=None): + """Convenience for writing a simple summary for a given tag.""" + value = summary_pb2.Summary.Value(tag=tag, simple_value=simple_value) + summary = summary_pb2.Summary(value=[value]) + self.add_summary(summary, global_step=step) + def add_event(self, event): if isinstance(event, event_pb2.Event): tf_event = tf.compat.v1.Event.FromString(event.SerializeToString()) From df5803f4cb7aed5dcbb4c627be09c9bb5bd9fb5b Mon Sep 17 00:00:00 2001 From: Nick Felt Date: Fri, 26 Jul 2019 17:15:42 -0700 Subject: [PATCH 09/11] CR: improve DirectoryLoader handling and testing of filesystem errors --- tensorboard/backend/event_processing/BUILD | 2 + .../event_processing/directory_loader.py | 14 ++++-- .../event_processing/directory_loader_test.py | 47 ++++++++++++++++--- 3 files changed, 53 insertions(+), 10 deletions(-) diff --git a/tensorboard/backend/event_processing/BUILD b/tensorboard/backend/event_processing/BUILD index 5458148788..db12734e30 100644 --- a/tensorboard/backend/event_processing/BUILD +++ b/tensorboard/backend/event_processing/BUILD @@ -50,7 +50,9 @@ py_test( deps = [ ":directory_loader", ":directory_watcher", + ":event_file_loader", "//tensorboard:expect_tensorflow_installed", + "//tensorboard/util:test_util", "@org_pythonhosted_mock", ], ) diff --git a/tensorboard/backend/event_processing/directory_loader.py b/tensorboard/backend/event_processing/directory_loader.py index 952fc407c4..4182c3f38d 100644 --- a/tensorboard/backend/event_processing/directory_loader.py +++ b/tensorboard/backend/event_processing/directory_loader.py @@ -89,10 +89,12 @@ def Load(self): for path in paths: for value in self._LoadPath(path): yield value - except tf.errors.OpError: + except tf.errors.OpError as e: if not tf.io.gfile.exists(self._directory): raise directory_watcher.DirectoryDeletedError( 'Directory %s has been permanently deleted' % self._directory) + else: + logger.info('Ignoring error during file loading: %s' % e) def _LoadPath(self, path): """Generator for values from a single path's loader. @@ -109,12 +111,16 @@ def _LoadPath(self, path): return loader = self._loaders.get(path, None) if loader is None: - loader = self._loader_factory(path) + try: + loader = self._loader_factory(path) + except tf.errors.NotFoundError: + # Happens if a file was removed after we listed the directory. + logger.debug('Skipping nonexistent path %s', path) + return self._loaders[path] = loader logger.info('Loading data from path %s', path) for timestamp, value in loader.Load(): - if max_timestamp is None or (timestamp is not None - and timestamp > max_timestamp): + if max_timestamp is None or timestamp > max_timestamp: max_timestamp = timestamp yield value if not self._MarkIfInactive(path, max_timestamp): diff --git a/tensorboard/backend/event_processing/directory_loader_test.py b/tensorboard/backend/event_processing/directory_loader_test.py index 792b421e3f..9b87958fab 100644 --- a/tensorboard/backend/event_processing/directory_loader_test.py +++ b/tensorboard/backend/event_processing/directory_loader_test.py @@ -20,6 +20,7 @@ from __future__ import print_function import functools +import glob import os import shutil @@ -33,7 +34,9 @@ from tensorboard.backend.event_processing import directory_loader from tensorboard.backend.event_processing import directory_watcher +from tensorboard.backend.event_processing import event_file_loader from tensorboard.backend.event_processing import io_wrapper +from tensorboard.util import test_util class _TimestampedByteLoader(object): @@ -195,12 +198,44 @@ def assertLoadersForPaths(paths): self._WriteToFile('b', ['B7'], [7]) self.assertLoaderYields([]) - def testDoesntCrashWhenFileIsDeleted(self): - self._WriteToFile('a', 'a') - self.assertLoaderYields(['a']) - os.remove(os.path.join(self._directory, 'a')) - self._WriteToFile('b', 'b') - self.assertLoaderYields(['b']) + def testDoesntCrashWhenCurrentFileIsDeleted(self): + # Use actual file loader so it emits the real error. + self._loader = directory_loader.DirectoryLoader( + self._directory, event_file_loader.TimestampedEventFileLoader) + def make_summary(tag_name): + return summary_pb2.Summary( + value=[summary_pb2.Summary.Value(tag=tag_name, simple_value=1.0)]) + with test_util.FileWriter(self._directory, filename_suffix='.a') as writer_a: + writer_a.add_test_summary('a') + events = list(self._loader.Load()) + events.pop(0) # Ignore the file_version event. + self.assertEqual(1, len(events)) + self.assertEqual('a', events[0].summary.value[0].tag) + os.remove(glob.glob(os.path.join(self._directory, '*.a'))[0]) + with test_util.FileWriter(self._directory, filename_suffix='.b') as writer_b: + writer_b.add_test_summary('b') + events = list(self._loader.Load()) + events.pop(0) # Ignore the file_version event. + self.assertEqual(1, len(events)) + self.assertEqual('b', events[0].summary.value[0].tag) + + def testDoesntCrashWhenUpcomingFileIsDeleted(self): + # Use actual file loader so it emits the real error. + self._loader = directory_loader.DirectoryLoader( + self._directory, event_file_loader.TimestampedEventFileLoader) + def make_summary(tag_name): + return summary_pb2.Summary( + value=[summary_pb2.Summary.Value(tag=tag_name, simple_value=1.0)]) + with test_util.FileWriter(self._directory, filename_suffix='.a') as writer_a: + writer_a.add_test_summary('a') + with test_util.FileWriter(self._directory, filename_suffix='.b') as writer_b: + writer_b.add_test_summary('b') + generator = self._loader.Load() + next(generator) # Ignore the file_version event. + event = next(generator) + self.assertEqual('a', event.summary.value[0].tag) + os.remove(glob.glob(os.path.join(self._directory, '*.b'))[0]) + self.assertEmpty(list(generator)) def testRaisesDirectoryDeletedError_whenDirectoryIsDeleted(self): self._WriteToFile('a', 'a') From c8ab641787d55bbeb60fc986bef82ce897e9dd64 Mon Sep 17 00:00:00 2001 From: Nick Felt Date: Fri, 26 Jul 2019 18:07:29 -0700 Subject: [PATCH 10/11] remove extraneous make_summary() definitions --- .../backend/event_processing/directory_loader_test.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/tensorboard/backend/event_processing/directory_loader_test.py b/tensorboard/backend/event_processing/directory_loader_test.py index 9b87958fab..2a61b8670d 100644 --- a/tensorboard/backend/event_processing/directory_loader_test.py +++ b/tensorboard/backend/event_processing/directory_loader_test.py @@ -202,9 +202,6 @@ def testDoesntCrashWhenCurrentFileIsDeleted(self): # Use actual file loader so it emits the real error. self._loader = directory_loader.DirectoryLoader( self._directory, event_file_loader.TimestampedEventFileLoader) - def make_summary(tag_name): - return summary_pb2.Summary( - value=[summary_pb2.Summary.Value(tag=tag_name, simple_value=1.0)]) with test_util.FileWriter(self._directory, filename_suffix='.a') as writer_a: writer_a.add_test_summary('a') events = list(self._loader.Load()) @@ -223,9 +220,6 @@ def testDoesntCrashWhenUpcomingFileIsDeleted(self): # Use actual file loader so it emits the real error. self._loader = directory_loader.DirectoryLoader( self._directory, event_file_loader.TimestampedEventFileLoader) - def make_summary(tag_name): - return summary_pb2.Summary( - value=[summary_pb2.Summary.Value(tag=tag_name, simple_value=1.0)]) with test_util.FileWriter(self._directory, filename_suffix='.a') as writer_a: writer_a.add_test_summary('a') with test_util.FileWriter(self._directory, filename_suffix='.b') as writer_b: From 62350b3782437d2bb2ca2c1b0b9a9e2865770d13 Mon Sep 17 00:00:00 2001 From: Nick Felt Date: Mon, 29 Jul 2019 10:22:52 -0700 Subject: [PATCH 11/11] CR: adjust FAQ wording to clarify that polling criteria are changed vs expanded --- README.md | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 248e125964..c4e1a915d2 100644 --- a/README.md +++ b/README.md @@ -250,9 +250,11 @@ tensorboard in inspect mode to inspect the contents of your event files. ### TensorBoard is showing only some of my data, or isn't properly updating! > **Update:** the [experimental `--reload_multifile=true` option][pr-1867] can -> now be used to poll multiple files per directory for new data, rather than -> just the most recent one as described below. You may need to install our -> nightly build [`tb-nightly`][tb-nightly] for this option to be available. +> now be used to poll all "active" files in a directory for new data, rather +> than the most recent one as described below. A file is "active" as long as it +> received new data within `--reload_multifile_inactive_secs` seconds ago, +> defaulting to 4000. You may need to install our nightly build +> [`tb-nightly`][tb-nightly] for this option to be available. This issue usually comes about because of how TensorBoard iterates through the `tfevents` files: it progresses through the events file in timestamp order, and @@ -266,9 +268,10 @@ multiple summary writers, each one should be writing to a separate directory. ### Does TensorBoard support multiple or distributed summary writers? > **Update:** the [experimental `--reload_multifile=true` option][pr-1867] can -> now be used to poll multiple files per directory for new data. You may need -> to install our nightly build [`tb-nightly`][tb-nightly] for this option to -> be available. +> now be used to poll all "active" files in a directory for new data, defined as +> any file that received new data within `--reload_multifile_inactive_secs` +> seconds ago, defaulting to 4000. You may need to install our nightly build +> [`tb-nightly`][tb-nightly] for this option to be available. No. TensorBoard expects that only one events file will be written to at a time, and multiple summary writers means multiple events files. If you are running a @@ -286,9 +289,10 @@ with itself, there are a few possible explanations. directory. Please have each TensorFlow run write to its own logdir. > **Update:** the [experimental `--reload_multifile=true` option][pr-1867] can - > now be used to poll multiple files per directory for new data. You may need - > to install our nightly build [`tb-nightly`][tb-nightly] for this option to - > be available. + > now be used to poll all "active" files in a directory for new data, defined + > as any file that received new data within `--reload_multifile_inactive_secs` + > seconds ago, defaulting to 4000. You may need to install our nightly build + > [`tb-nightly`][tb-nightly] for this option to be available. * You may have a bug in your code where the global_step variable (passed to `FileWriter.add_summary`) is being maintained incorrectly.