From a69ab7bb70638af882acb8095f97794864815f0b Mon Sep 17 00:00:00 2001 From: ChanTsune <41658782+ChanTsune@users.noreply.github.com> Date: Mon, 2 Dec 2024 16:46:45 +0900 Subject: [PATCH] :racehorse: Add `ChunkExt::write_chunk_in_async`, `SealedEntryExt::write_in_async` for avoid copy --- lib/src/archive/write.rs | 5 +- lib/src/chunk.rs | 14 ++ lib/src/entry.rs | 364 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 379 insertions(+), 4 deletions(-) diff --git a/lib/src/archive/write.rs b/lib/src/archive/write.rs index 4e45a34d0..7d4a31b00 100644 --- a/lib/src/archive/write.rs +++ b/lib/src/archive/write.rs @@ -315,10 +315,7 @@ impl Archive { /// This API is unstable. #[inline] pub async fn add_entry_async(&mut self, entry: impl Entry) -> io::Result { - let mut bytes = Vec::new(); - entry.write_in(&mut bytes)?; - self.inner.write_all(&bytes).await?; - Ok(bytes.len()) + entry.write_in_async(&mut self.inner).await } /// Write an end marker to finalize the archive. diff --git a/lib/src/chunk.rs b/lib/src/chunk.rs index adde8a50b..5437aaa15 100644 --- a/lib/src/chunk.rs +++ b/lib/src/chunk.rs @@ -44,6 +44,20 @@ pub(crate) trait ChunkExt: Chunk { Ok(self.bytes_len()) } + #[cfg(feature = "unstable-async")] + #[inline] + async fn write_chunk_in_async( + &self, + writer: &mut W, + ) -> io::Result { + use futures_util::AsyncWriteExt; + writer.write_all(&self.length().to_be_bytes()).await?; + writer.write_all(&self.ty().0).await?; + writer.write_all(self.data()).await?; + writer.write_all(&self.crc().to_be_bytes()).await?; + Ok(self.bytes_len()) + } + /// Convert the provided `Chunk` instance into a `Vec`. /// /// # Returns diff --git a/lib/src/entry.rs b/lib/src/entry.rs index ba06ef6ab..b2dd6b975 100644 --- a/lib/src/entry.rs +++ b/lib/src/entry.rs @@ -28,6 +28,11 @@ mod private { pub trait SealedEntryExt { fn into_chunks(self) -> Vec; fn write_in(&self, writer: &mut W) -> io::Result; + #[cfg(feature = "unstable-async")] + async fn write_in_async( + &self, + writer: &mut W, + ) -> io::Result; } } @@ -52,6 +57,19 @@ impl SealedEntryExt for RawEntry> { } Ok(total) } + + #[cfg(feature = "unstable-async")] + #[inline] + async fn write_in_async( + &self, + writer: &mut W, + ) -> io::Result { + let mut total = 0; + for chunk in self.0.iter() { + total += chunk.write_chunk_in_async(writer).await?; + } + Ok(total) + } } impl SealedEntryExt for RawEntry<&[u8]> { @@ -68,6 +86,19 @@ impl SealedEntryExt for RawEntry<&[u8]> { } Ok(total) } + + #[cfg(feature = "unstable-async")] + #[inline] + async fn write_in_async( + &self, + writer: &mut W, + ) -> io::Result { + let mut total = 0; + for chunk in self.0.iter() { + total += chunk.write_chunk_in_async(writer).await?; + } + Ok(total) + } } impl SealedEntryExt for RawEntry> { @@ -84,6 +115,19 @@ impl SealedEntryExt for RawEntry> { } Ok(total) } + + #[cfg(feature = "unstable-async")] + #[inline] + async fn write_in_async( + &self, + writer: &mut W, + ) -> io::Result { + let mut total = 0; + for chunk in self.0.iter() { + total += chunk.write_chunk_in_async(writer).await?; + } + Ok(total) + } } impl Entry for RawEntry where RawEntry: SealedEntryExt {} @@ -168,6 +212,18 @@ where ReadEntry::Solid(s) => s.write_in(writer), } } + + #[cfg(feature = "unstable-async")] + #[inline] + async fn write_in_async( + &self, + writer: &mut W, + ) -> io::Result { + match self { + ReadEntry::Normal(r) => r.write_in_async(writer).await, + ReadEntry::Solid(s) => s.write_in_async(writer).await, + } + } } impl Entry for ReadEntry where ReadEntry: SealedEntryExt {} @@ -313,6 +369,30 @@ impl SealedEntryExt for SolidEntry> { total += (ChunkType::SEND, []).write_chunk_in(writer)?; Ok(total) } + + #[cfg(feature = "unstable-async")] + async fn write_in_async( + &self, + writer: &mut W, + ) -> io::Result { + let mut total = 0; + total += (ChunkType::SHED, self.header.to_bytes()) + .write_chunk_in_async(writer) + .await?; + for extra_chunk in &self.extra { + total += extra_chunk.write_chunk_in_async(writer).await?; + } + if let Some(phsf) = &self.phsf { + total += (ChunkType::PHSF, phsf.as_bytes()) + .write_chunk_in_async(writer) + .await?; + } + for data in &self.data { + total += (ChunkType::SDAT, data).write_chunk_in_async(writer).await?; + } + total += (ChunkType::SEND, []).write_chunk_in_async(writer).await?; + Ok(total) + } } impl SealedEntryExt for SolidEntry<&[u8]> { @@ -346,6 +426,32 @@ impl SealedEntryExt for SolidEntry<&[u8]> { total += (ChunkType::SEND, []).write_chunk_in(writer)?; Ok(total) } + + #[cfg(feature = "unstable-async")] + async fn write_in_async( + &self, + writer: &mut W, + ) -> io::Result { + let mut total = 0; + total += (ChunkType::SHED, self.header.to_bytes()) + .write_chunk_in_async(writer) + .await?; + for extra_chunk in &self.extra { + total += extra_chunk.write_chunk_in_async(writer).await?; + } + if let Some(phsf) = &self.phsf { + total += (ChunkType::PHSF, phsf.as_bytes()) + .write_chunk_in_async(writer) + .await?; + } + for data in &self.data { + total += (ChunkType::SDAT, *data) + .write_chunk_in_async(writer) + .await?; + } + total += (ChunkType::SEND, []).write_chunk_in_async(writer).await?; + Ok(total) + } } impl SealedEntryExt for SolidEntry> { @@ -379,6 +485,32 @@ impl SealedEntryExt for SolidEntry> { total += (ChunkType::SEND, []).write_chunk_in(writer)?; Ok(total) } + + #[cfg(feature = "unstable-async")] + async fn write_in_async( + &self, + writer: &mut W, + ) -> io::Result { + let mut total = 0; + total += (ChunkType::SHED, self.header.to_bytes()) + .write_chunk_in_async(writer) + .await?; + for extra_chunk in &self.extra { + total += extra_chunk.write_chunk_in_async(writer).await?; + } + if let Some(phsf) = &self.phsf { + total += (ChunkType::PHSF, phsf.as_bytes()) + .write_chunk_in_async(writer) + .await?; + } + for data in &self.data { + total += (ChunkType::SDAT, data.as_ref()) + .write_chunk_in_async(writer) + .await?; + } + total += (ChunkType::SEND, []).write_chunk_in_async(writer).await?; + Ok(total) + } } impl Entry for SolidEntry where SolidEntry: SealedEntryExt {} @@ -769,6 +901,79 @@ impl SealedEntryExt for NormalEntry> { total += (ChunkType::FEND, []).write_chunk_in(writer)?; Ok(total) } + + #[cfg(feature = "unstable-async")] + #[inline] + async fn write_in_async( + &self, + writer: &mut W, + ) -> io::Result { + let mut total = 0; + + let Metadata { + raw_file_size, + compressed_size: _, + created, + modified, + accessed, + permission, + } = &self.metadata; + + total += (ChunkType::FHED, self.header.to_bytes()) + .write_chunk_in_async(writer) + .await?; + for ex in &self.extra { + total += ex.write_chunk_in_async(writer).await?; + } + if let Some(raw_file_size) = raw_file_size { + total += ( + ChunkType::fSIZ, + skip_while(&raw_file_size.to_be_bytes(), |i| *i == 0), + ) + .write_chunk_in_async(writer) + .await?; + } + + if let Some(p) = &self.phsf { + total += (ChunkType::PHSF, p.as_bytes()) + .write_chunk_in_async(writer) + .await?; + } + for data_chunk in &self.data { + for data_unit in data_chunk.chunks(u32::MAX as usize) { + total += (ChunkType::FDAT, data_unit) + .write_chunk_in_async(writer) + .await?; + } + } + if let Some(c) = created { + total += (ChunkType::cTIM, c.as_secs().to_be_bytes()) + .write_chunk_in_async(writer) + .await?; + } + if let Some(d) = modified { + total += (ChunkType::mTIM, d.as_secs().to_be_bytes()) + .write_chunk_in_async(writer) + .await?; + } + if let Some(a) = accessed { + total += (ChunkType::aTIM, a.as_secs().to_be_bytes()) + .write_chunk_in_async(writer) + .await?; + } + if let Some(p) = permission { + total += (ChunkType::fPRM, p.to_bytes()) + .write_chunk_in_async(writer) + .await?; + } + for xattr in &self.xattrs { + total += (ChunkType::xATR, xattr.to_bytes()) + .write_chunk_in_async(writer) + .await?; + } + total += (ChunkType::FEND, []).write_chunk_in_async(writer).await?; + Ok(total) + } } impl SealedEntryExt for NormalEntry<&[u8]> { @@ -877,6 +1082,79 @@ impl SealedEntryExt for NormalEntry<&[u8]> { total += (ChunkType::FEND, []).write_chunk_in(writer)?; Ok(total) } + + #[cfg(feature = "unstable-async")] + #[inline] + async fn write_in_async( + &self, + writer: &mut W, + ) -> io::Result { + let mut total = 0; + + let Metadata { + raw_file_size, + compressed_size: _, + created, + modified, + accessed, + permission, + } = &self.metadata; + + total += (ChunkType::FHED, self.header.to_bytes()) + .write_chunk_in_async(writer) + .await?; + for ex in &self.extra { + total += ex.write_chunk_in_async(writer).await?; + } + if let Some(raw_file_size) = raw_file_size { + total += ( + ChunkType::fSIZ, + skip_while(&raw_file_size.to_be_bytes(), |i| *i == 0), + ) + .write_chunk_in_async(writer) + .await?; + } + + if let Some(p) = &self.phsf { + total += (ChunkType::PHSF, p.as_bytes()) + .write_chunk_in_async(writer) + .await?; + } + for data_chunk in &self.data { + for data_unit in data_chunk.chunks(u32::MAX as usize) { + total += (ChunkType::FDAT, data_unit) + .write_chunk_in_async(writer) + .await?; + } + } + if let Some(c) = created { + total += (ChunkType::cTIM, c.as_secs().to_be_bytes()) + .write_chunk_in_async(writer) + .await?; + } + if let Some(d) = modified { + total += (ChunkType::mTIM, d.as_secs().to_be_bytes()) + .write_chunk_in_async(writer) + .await?; + } + if let Some(a) = accessed { + total += (ChunkType::aTIM, a.as_secs().to_be_bytes()) + .write_chunk_in_async(writer) + .await?; + } + if let Some(p) = permission { + total += (ChunkType::fPRM, p.to_bytes()) + .write_chunk_in_async(writer) + .await?; + } + for xattr in &self.xattrs { + total += (ChunkType::xATR, xattr.to_bytes()) + .write_chunk_in_async(writer) + .await?; + } + total += (ChunkType::FEND, []).write_chunk_in_async(writer).await?; + Ok(total) + } } impl SealedEntryExt for NormalEntry> { @@ -985,6 +1263,79 @@ impl SealedEntryExt for NormalEntry> { total += (ChunkType::FEND, []).write_chunk_in(writer)?; Ok(total) } + + #[cfg(feature = "unstable-async")] + #[inline] + async fn write_in_async( + &self, + writer: &mut W, + ) -> io::Result { + let mut total = 0; + + let Metadata { + raw_file_size, + compressed_size: _, + created, + modified, + accessed, + permission, + } = &self.metadata; + + total += (ChunkType::FHED, self.header.to_bytes()) + .write_chunk_in_async(writer) + .await?; + for ex in &self.extra { + total += ex.write_chunk_in_async(writer).await?; + } + if let Some(raw_file_size) = raw_file_size { + total += ( + ChunkType::fSIZ, + skip_while(&raw_file_size.to_be_bytes(), |i| *i == 0), + ) + .write_chunk_in_async(writer) + .await?; + } + + if let Some(p) = &self.phsf { + total += (ChunkType::PHSF, p.as_bytes()) + .write_chunk_in_async(writer) + .await?; + } + for data_chunk in &self.data { + for data_unit in data_chunk.chunks(u32::MAX as usize) { + total += (ChunkType::FDAT, data_unit) + .write_chunk_in_async(writer) + .await?; + } + } + if let Some(c) = created { + total += (ChunkType::cTIM, c.as_secs().to_be_bytes()) + .write_chunk_in_async(writer) + .await?; + } + if let Some(d) = modified { + total += (ChunkType::mTIM, d.as_secs().to_be_bytes()) + .write_chunk_in_async(writer) + .await?; + } + if let Some(a) = accessed { + total += (ChunkType::aTIM, a.as_secs().to_be_bytes()) + .write_chunk_in_async(writer) + .await?; + } + if let Some(p) = permission { + total += (ChunkType::fPRM, p.to_bytes()) + .write_chunk_in_async(writer) + .await?; + } + for xattr in &self.xattrs { + total += (ChunkType::xATR, xattr.to_bytes()) + .write_chunk_in_async(writer) + .await?; + } + total += (ChunkType::FEND, []).write_chunk_in_async(writer).await?; + Ok(total) + } } impl Entry for NormalEntry where NormalEntry: SealedEntryExt {} @@ -1280,6 +1631,19 @@ impl SealedEntryExt for ChunkSolidEntries { } Ok(total) } + + #[cfg(feature = "unstable-async")] + #[inline] + async fn write_in_async( + &self, + writer: &mut W, + ) -> io::Result { + let mut total = 0; + for chunk in self.0.iter() { + total += chunk.write_chunk_in_async(writer).await?; + } + Ok(total) + } } #[inline]