1use crate::connection::ConnectionRef;
4use crate::types::{BorrowToSql, ToSql, Type};
5use crate::{CopyInWriter, CopyOutReader, Error};
6use fallible_iterator::FallibleIterator;
7use futures_util::StreamExt;
8use std::pin::Pin;
9#[doc(inline)]
10pub use tokio_postgres::binary_copy::BinaryCopyOutRow;
11use tokio_postgres::binary_copy::{self, BinaryCopyOutStream};
12
13pub struct BinaryCopyInWriter<'a> {
17 connection: ConnectionRef<'a>,
18 sink: Pin<Box<binary_copy::BinaryCopyInWriter>>,
19}
20
21impl<'a> BinaryCopyInWriter<'a> {
22 pub fn new(writer: CopyInWriter<'a>, types: &[Type]) -> BinaryCopyInWriter<'a> {
24 let stream = writer
25 .sink
26 .into_unpinned()
27 .expect("writer has already been written to");
28
29 BinaryCopyInWriter {
30 connection: writer.connection,
31 sink: Box::pin(binary_copy::BinaryCopyInWriter::new(stream, types)),
32 }
33 }
34
35 pub fn write(&mut self, values: &[&(dyn ToSql + Sync)]) -> Result<(), Error> {
41 self.connection.block_on(self.sink.as_mut().write(values))
42 }
43
44 pub fn write_raw<P, I>(&mut self, values: I) -> Result<(), Error>
50 where
51 P: BorrowToSql,
52 I: IntoIterator<Item = P>,
53 I::IntoIter: ExactSizeIterator,
54 {
55 self.connection
56 .block_on(self.sink.as_mut().write_raw(values))
57 }
58
59 pub fn finish(mut self) -> Result<u64, Error> {
63 self.connection.block_on(self.sink.as_mut().finish())
64 }
65}
66
67pub struct BinaryCopyOutIter<'a> {
69 connection: ConnectionRef<'a>,
70 stream: Pin<Box<BinaryCopyOutStream>>,
71}
72
73impl<'a> BinaryCopyOutIter<'a> {
74 pub fn new(reader: CopyOutReader<'a>, types: &[Type]) -> BinaryCopyOutIter<'a> {
76 let stream = reader
77 .stream
78 .into_unpinned()
79 .expect("reader has already been read from");
80
81 BinaryCopyOutIter {
82 connection: reader.connection,
83 stream: Box::pin(BinaryCopyOutStream::new(stream, types)),
84 }
85 }
86}
87
88impl FallibleIterator for BinaryCopyOutIter<'_> {
89 type Item = BinaryCopyOutRow;
90 type Error = Error;
91
92 fn next(&mut self) -> Result<Option<BinaryCopyOutRow>, Error> {
93 let stream = &mut self.stream;
94 self.connection
95 .block_on(async { stream.next().await.transpose() })
96 }
97}