Unverified Commit 08a3181b authored by PengGao's avatar PengGao Committed by GitHub
Browse files

Fix save (#397)

parent 52ecf060
......@@ -57,11 +57,13 @@ class VARecorder:
# TCP socket for send and recv video and audio data
self.video_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.video_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.video_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.video_socket.bind(("127.0.0.1", self.video_port))
self.video_socket.listen(1)
self.audio_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.audio_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.audio_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.audio_socket.bind(("127.0.0.1", self.audio_port))
self.audio_socket.listen(1)
......@@ -81,15 +83,22 @@ class VARecorder:
break
# Convert audio data to 16-bit integer format
audios = torch.clamp(torch.round(data * 32767), -32768, 32767).to(torch.int16)
self.audio_conn.send(audios[None].cpu().numpy().tobytes())
try:
self.audio_conn.send(audios[None].cpu().numpy().tobytes())
except (BrokenPipeError, OSError, ConnectionResetError) as e:
logger.info(f"Audio connection closed, stopping worker: {type(e).__name__}")
return
fail_time = 0
except: # noqa
except (BrokenPipeError, OSError, ConnectionResetError):
logger.info("Audio connection closed during queue processing")
break
except Exception:
logger.error(f"Send audio data error: {traceback.format_exc()}")
fail_time += 1
if fail_time > max_fail_time:
logger.error(f"Audio push worker thread failed {fail_time} times, stopping...")
break
except: # noqa
except Exception:
logger.error(f"Audio push worker thread error: {traceback.format_exc()}")
finally:
logger.info("Audio push worker thread stopped")
......@@ -114,18 +123,25 @@ class VARecorder:
for i in range(data.shape[0]):
t0 = time.time()
frame = (data[i] * 255).clamp(0, 255).to(torch.uint8).cpu().numpy()
self.video_conn.send(frame.tobytes())
try:
self.video_conn.send(frame.tobytes())
except (BrokenPipeError, OSError, ConnectionResetError) as e:
logger.info(f"Video connection closed, stopping worker: {type(e).__name__}")
return
if self.realtime:
time.sleep(max(0, packet_secs - (time.time() - t0)))
fail_time = 0
except: # noqa
except (BrokenPipeError, OSError, ConnectionResetError):
logger.info("Video connection closed during queue processing")
break
except Exception:
logger.error(f"Send video data error: {traceback.format_exc()}")
fail_time += 1
if fail_time > max_fail_time:
logger.error(f"Video push worker thread failed {fail_time} times, stopping...")
break
except: # noqa
except Exception:
logger.error(f"Video push worker thread error: {traceback.format_exc()}")
finally:
logger.info("Video push worker thread stopped")
......@@ -367,15 +383,19 @@ class VARecorder:
if self.video_queue:
self.video_queue.put(None)
# Wait for threads to finish processing queued data (increased timeout)
queue_timeout = 30 # Increased from 5s to 30s to allow sufficient time for large video frames
if self.audio_thread and self.audio_thread.is_alive():
self.audio_thread.join(timeout=5)
self.audio_thread.join(timeout=queue_timeout)
if self.audio_thread.is_alive():
logger.warning("Audio push thread did not stop gracefully")
logger.error(f"Audio push thread did not stop after {queue_timeout}s")
if self.video_thread and self.video_thread.is_alive():
self.video_thread.join(timeout=5)
self.video_thread.join(timeout=queue_timeout)
if self.video_thread.is_alive():
logger.warning("Video push thread did not stop gracefully")
logger.error(f"Video push thread did not stop after {queue_timeout}s")
# Shutdown connections to signal EOF to FFmpeg
# shutdown(SHUT_WR) will wait for send buffer to flush, no explicit sleep needed
if self.audio_conn:
try:
self.audio_conn.getpeername()
......@@ -396,21 +416,28 @@ class VARecorder:
if self.ffmpeg_process:
is_local_file = not self.livestream_url.startswith(("rtmp://", "http"))
timeout_seconds = 15 if is_local_file else 10
logger.info(f"Waiting for FFmpeg to finalize (timeout={timeout_seconds}s, local_file={is_local_file})")
# Local MP4 files need time to write moov atom and finalize the container
timeout_seconds = 30 if is_local_file else 10
logger.info(f"Waiting for FFmpeg to finalize file (timeout={timeout_seconds}s, local_file={is_local_file})")
logger.info(f"FFmpeg output: {self.livestream_url}")
try:
self.ffmpeg_process.wait(timeout=timeout_seconds)
logger.info("FFmpeg process exited gracefully")
returncode = self.ffmpeg_process.wait(timeout=timeout_seconds)
if returncode == 0:
logger.info(f"FFmpeg process exited successfully (exit code: {returncode})")
else:
logger.warning(f"FFmpeg process exited with non-zero code: {returncode}")
except subprocess.TimeoutExpired:
logger.warning(f"FFmpeg process did not exit within {timeout_seconds}s, sending SIGTERM...")
try:
self.ffmpeg_process.terminate() # SIGTERM
self.ffmpeg_process.wait(timeout=3)
logger.warning("FFmpeg process terminated with SIGTERM")
returncode = self.ffmpeg_process.wait(timeout=5)
logger.warning(f"FFmpeg process terminated with SIGTERM (exit code: {returncode})")
except subprocess.TimeoutExpired:
logger.error("FFmpeg process still running, killing with SIGKILL...")
logger.error("FFmpeg process still running after SIGTERM, killing with SIGKILL...")
self.ffmpeg_process.kill()
self.ffmpeg_process.wait() # Wait for kill to complete
logger.error("FFmpeg process killed with SIGKILL")
finally:
self.ffmpeg_process = None
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment