Commit a3ef6dcb by ento Committed by GitHub

Fix executor deadlock on IO errors

This change ensures that the executor does not deadlock on IO errors,
for example when IO does not support non-ascii characters.
parent d2fd581c
...@@ -131,32 +131,29 @@ class Executor(object): ...@@ -131,32 +131,29 @@ class Executor(object):
return return
if self._io.is_debug(): if self._io.is_debug():
self._lock.acquire() with self._lock:
section = self._sections[id(operation)] section = self._sections[id(operation)]
section.write_line(line) section.write_line(line)
self._lock.release()
return return
self._lock.acquire() with self._lock:
section = self._sections[id(operation)] section = self._sections[id(operation)]
section.output.clear() section.output.clear()
section.write(line) section.write(line)
self._lock.release()
def _execute_operation(self, operation): def _execute_operation(self, operation):
try: try:
if self.supports_fancy_output(): if self.supports_fancy_output():
if id(operation) not in self._sections: if id(operation) not in self._sections:
if self._should_write_operation(operation): if self._should_write_operation(operation):
self._lock.acquire() with self._lock:
self._sections[id(operation)] = self._io.section() self._sections[id(operation)] = self._io.section()
self._sections[id(operation)].write_line( self._sections[id(operation)].write_line(
" <fg=blue;options=bold>•</> {message}: <fg=blue>Pending...</>".format( " <fg=blue;options=bold>•</> {message}: <fg=blue>Pending...</>".format(
message=self.get_operation_message(operation), message=self.get_operation_message(operation),
), ),
) )
self._lock.release()
else: else:
if self._should_write_operation(operation): if self._should_write_operation(operation):
if not operation.skipped: if not operation.skipped:
...@@ -190,37 +187,37 @@ class Executor(object): ...@@ -190,37 +187,37 @@ class Executor(object):
if result == -2: if result == -2:
raise KeyboardInterrupt raise KeyboardInterrupt
except Exception as e: except Exception as e:
from clikit.ui.components.exception_trace import ExceptionTrace try:
from clikit.ui.components.exception_trace import ExceptionTrace
if not self.supports_fancy_output():
io = self._io
else:
message = " <error>•</error> {message}: <error>Failed</error>".format(
message=self.get_operation_message(operation, error=True),
)
self._write(operation, message)
io = self._sections.get(id(operation), self._io)
self._lock.acquire()
trace = ExceptionTrace(e)
trace.render(io)
io.write_line("")
self._shutdown = True if not self.supports_fancy_output():
self._lock.release() io = self._io
else:
message = " <error>•</error> {message}: <error>Failed</error>".format(
message=self.get_operation_message(operation, error=True),
)
self._write(operation, message)
io = self._sections.get(id(operation), self._io)
with self._lock:
trace = ExceptionTrace(e)
trace.render(io)
io.write_line("")
finally:
with self._lock:
self._shutdown = True
except KeyboardInterrupt: except KeyboardInterrupt:
message = " <warning>•</warning> {message}: <warning>Cancelled</warning>".format( try:
message=self.get_operation_message(operation, warning=True), message = " <warning>•</warning> {message}: <warning>Cancelled</warning>".format(
) message=self.get_operation_message(operation, warning=True),
if not self.supports_fancy_output(): )
self._io.write_line(message) if not self.supports_fancy_output():
else: self._io.write_line(message)
self._write(operation, message) else:
self._write(operation, message)
self._lock.acquire() finally:
self._shutdown = True with self._lock:
self._lock.release() self._shutdown = True
def _do_execute_operation(self, operation): def _do_execute_operation(self, operation):
method = operation.job_type method = operation.job_type
...@@ -266,14 +263,12 @@ class Executor(object): ...@@ -266,14 +263,12 @@ class Executor(object):
return result return result
def _increment_operations_count(self, operation, executed): def _increment_operations_count(self, operation, executed):
self._lock.acquire() with self._lock:
if executed: if executed:
self._executed_operations += 1 self._executed_operations += 1
self._executed[operation.job_type] += 1 self._executed[operation.job_type] += 1
else: else:
self._skipped[operation.job_type] += 1 self._skipped[operation.job_type] += 1
self._lock.release()
def run_pip(self, *args, **kwargs): # type: (...) -> int def run_pip(self, *args, **kwargs): # type: (...) -> int
try: try:
...@@ -622,9 +617,8 @@ class Executor(object): ...@@ -622,9 +617,8 @@ class Executor(object):
progress.set_format(message + " <b>%percent%%</b>") progress.set_format(message + " <b>%percent%%</b>")
if progress: if progress:
self._lock.acquire() with self._lock:
progress.start() progress.start()
self._lock.release()
done = 0 done = 0
archive = self._chef.get_cache_directory_for_link(link) / link.filename archive = self._chef.get_cache_directory_for_link(link) / link.filename
...@@ -637,16 +631,14 @@ class Executor(object): ...@@ -637,16 +631,14 @@ class Executor(object):
done += len(chunk) done += len(chunk)
if progress: if progress:
self._lock.acquire() with self._lock:
progress.set_progress(done) progress.set_progress(done)
self._lock.release()
f.write(chunk) f.write(chunk)
if progress: if progress:
self._lock.acquire() with self._lock:
progress.finish() progress.finish()
self._lock.release()
return archive return archive
......
...@@ -185,6 +185,32 @@ Package operations: 1 install, 0 updates, 0 removals ...@@ -185,6 +185,32 @@ Package operations: 1 install, 0 updates, 0 removals
assert expected == io.fetch_output() assert expected == io.fetch_output()
def test_execute_should_gracefully_handle_io_error(config, mocker, io):
env = MockEnv()
executor = Executor(env, pool, config, io)
executor.verbose()
original_write_line = executor._io.write_line
def write_line(string, flags=None):
# Simulate UnicodeEncodeError
string.encode("ascii")
original_write_line(string, flags)
mocker.patch.object(io, "write_line", side_effect=write_line)
assert 1 == executor.execute([Install(Package("clikit", "0.2.3"))])
expected = r"""
Package operations: 1 install, 0 updates, 0 removals
\s*Unicode\w+Error
"""
assert re.match(expected, io.fetch_output())
def test_executor_should_delete_incomplete_downloads( def test_executor_should_delete_incomplete_downloads(
config, io, tmp_dir, mocker, pool, mock_file_downloads config, io, tmp_dir, mocker, pool, mock_file_downloads
): ):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment