telodendria/src/Stream.c

509 lines
8.7 KiB
C
Raw Normal View History

2023-03-16 12:29:38 +00:00
/*
* Copyright (C) 2022-2023 Jordan Bancino <@jordan:bancino.net>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation files
* (the "Software"), to deal in the Software without restriction,
* including without limitation the rights to use, copy, modify, merge,
* publish, distribute, sublicense, and/or sell copies of the Software,
* and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
2023-03-15 02:40:23 +00:00
#include <Stream.h>
#include <Io.h>
#include <Memory.h>
#include <Util.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <sys/stat.h>
#ifndef STREAM_RETRIES
#define STREAM_RETRIES 10
#endif
#ifndef STREAM_DELAY
#define STREAM_DELAY 2
#endif
#define STREAM_EOF (1 << 0)
#define STREAM_ERR (1 << 1)
2023-03-15 02:40:23 +00:00
struct Stream
{
Io *io;
int *rBuf;
size_t rLen;
size_t rOff;
2023-03-15 02:40:23 +00:00
int *wBuf;
size_t wLen;
int *ugBuf;
size_t ugSize;
size_t ugLen;
int flags:2;
2023-03-15 02:40:23 +00:00
};
Stream *
StreamIo(Io * io)
2023-03-15 02:40:23 +00:00
{
Stream *stream;
if (!io)
2023-03-15 02:40:23 +00:00
{
return NULL;
}
stream = Malloc(sizeof(Stream));
if (!stream)
{
return NULL;
}
memset(stream, 0, sizeof(Stream));
stream->io = io;
2023-03-15 02:40:23 +00:00
return stream;
}
Stream *
StreamFd(int fd)
{
Io *io = IoFd(fd);
if (!io)
{
return NULL;
}
return StreamIo(io);
}
Stream *
2023-03-16 16:25:24 +00:00
StreamFile(FILE *fp)
{
2023-03-16 16:25:24 +00:00
Io *io = IoFile(fp);
2023-03-16 16:25:24 +00:00
if (!io)
{
return NULL;
}
2023-03-16 16:25:24 +00:00
return StreamIo(io);
}
2023-03-16 16:25:24 +00:00
Stream *
StreamOpen(const char *path, const char *mode)
{
FILE *fp = fopen(path, mode);
if (!fp)
{
return NULL;
}
2023-03-16 16:25:24 +00:00
return StreamFile(fp);
}
int
StreamClose(Stream * stream)
2023-03-15 02:40:23 +00:00
{
int ret = 0;
2023-03-15 02:40:23 +00:00
if (!stream)
{
errno = EBADF;
return EOF;
2023-03-15 02:40:23 +00:00
}
if (stream->rBuf)
{
Free(stream->rBuf);
}
2023-03-15 02:40:23 +00:00
if (stream->wBuf)
2023-03-15 02:40:23 +00:00
{
ssize_t writeRes = IoWrite(stream->io, stream->wBuf, stream->wLen);
Free(stream->wBuf);
if (writeRes == -1)
{
ret = EOF;
}
}
if (stream->ugBuf)
{
Free(stream->ugBuf);
2023-03-15 02:40:23 +00:00
}
ret = IoClose(stream->io);
Free(stream);
return ret;
2023-03-15 02:40:23 +00:00
}
int
StreamVprintf(Stream * stream, const char *fmt, va_list ap)
2023-03-15 02:40:23 +00:00
{
/* This might look like very similar code to IoVprintf(),
* but I chose not to defer to IoVprintf() because that
* would require us to immediately flush the buffer, since
* the Io API is unbuffered. StreamPuts() uses StreamPutc()
* under the hood, which is buffered. It therefore allows
* us to finish filling the buffer and then only flush it
* when necessary, preventing superfluous writes.
*/
char *buf;
ssize_t len;
int ret;
if (!stream || !fmt)
2023-03-15 02:40:23 +00:00
{
return -1;
}
buf = Malloc(IO_BUFFER);
if (!buf)
{
return -1;
}
len = vsnprintf(buf, IO_BUFFER, fmt, ap);
if (len < 0)
{
Free(buf);
return len;
}
if (len >= IO_BUFFER)
{
char *new = Realloc(buf, len + 1);
if (!new)
{
Free(buf);
return -1;
}
buf = new;
vsnprintf(buf, len, fmt, ap);
}
ret = StreamPuts(stream, buf);
2023-03-15 02:40:23 +00:00
Free(buf);
return ret;
2023-03-15 02:40:23 +00:00
}
int
StreamPrintf(Stream * stream, const char *fmt,...)
2023-03-15 02:40:23 +00:00
{
int ret;
va_list ap;
va_start(ap, fmt);
ret = StreamVprintf(stream, fmt, ap);
va_end(ap);
return ret;
}
int
StreamGetc(Stream * stream)
{
int c;
2023-03-15 02:40:23 +00:00
if (!stream)
{
errno = EBADF;
return EOF;
2023-03-15 02:40:23 +00:00
}
/* Empty the ungetc stack first */
if (stream->ugLen)
2023-03-15 02:40:23 +00:00
{
c = stream->ugBuf[stream->ugLen - 1];
stream->ugLen--;
return c;
2023-03-15 02:40:23 +00:00
}
if (stream->flags & EOF)
2023-03-15 02:40:23 +00:00
{
return EOF;
2023-03-15 02:40:23 +00:00
}
if (!stream->rBuf)
2023-03-15 02:40:23 +00:00
{
/* No buffer allocated yet */
stream->rBuf = Malloc(IO_BUFFER * sizeof(int));
if (!stream->rBuf)
{
stream->flags |= STREAM_ERR;
return EOF;
}
stream->rOff = 0;
stream->rLen = 0;
2023-03-15 02:40:23 +00:00
}
if (stream->rOff >= stream->rLen)
{
/* We read through the entire buffer; get a new one */
ssize_t readRes = IoRead(stream->io, stream->rBuf, IO_BUFFER);
if (readRes == 0)
{
stream->flags |= STREAM_EOF;
return EOF;
}
if (readRes == -1)
{
stream->flags |= STREAM_ERR;
return EOF;
}
stream->rOff = 0;
stream->rLen = readRes;
}
2023-03-15 02:40:23 +00:00
/* Read the character in the buffer and advance the offset */
c = stream->rBuf[stream->rOff];
stream->rOff++;
return c;
2023-03-15 02:40:23 +00:00
}
int
StreamUngetc(Stream * stream, int c)
2023-03-15 02:40:23 +00:00
{
if (!stream)
{
errno = EBADF;
return EOF;
}
2023-03-15 02:40:23 +00:00
if (!stream->ugBuf)
{
stream->ugSize = IO_BUFFER;
stream->ugBuf = Malloc(stream->ugSize);
if (!stream->ugBuf)
{
stream->flags |= STREAM_ERR;
return EOF;
}
}
2023-03-15 02:40:23 +00:00
if (stream->ugLen >= stream->ugSize)
{
int *new;
stream->ugSize += IO_BUFFER;
new = Realloc(stream->ugBuf, stream->ugSize);
if (!new)
{
stream->flags |= STREAM_ERR;
Free(stream->ugBuf);
stream->ugBuf = NULL;
return EOF;
}
Free(stream->ugBuf);
stream->ugBuf = new;
}
stream->ugBuf[stream->ugLen - 1] = c;
stream->ugLen++;
2023-03-15 02:40:23 +00:00
return c;
2023-03-15 02:40:23 +00:00
}
int
StreamPutc(Stream * stream, int c)
2023-03-15 02:40:23 +00:00
{
if (!stream)
{
errno = EBADF;
return EOF;
}
if (!stream->wBuf)
{
stream->wBuf = Malloc(IO_BUFFER * sizeof(int));
if (!stream->wBuf)
{
stream->flags |= STREAM_ERR;
return EOF;
}
}
2023-03-15 02:40:23 +00:00
if (stream->wLen == IO_BUFFER)
{
/* Buffer full; write it */
ssize_t writeRes = IoWrite(stream->io, stream->wBuf, stream->wLen);
if (writeRes == -1)
{
stream->flags |= STREAM_ERR;
return EOF;
}
stream->wLen = 0;
}
stream->wBuf[stream->wLen] = c;
stream->wLen++;
return c;
2023-03-15 02:40:23 +00:00
}
int
StreamPuts(Stream *stream, char *str)
{
int ret = 0;
if (!stream)
{
errno = EBADF;
return -1;
}
while (*str)
{
if (StreamPutc(stream, *str) == EOF)
{
ret = -1;
break;
}
str++;
}
return ret;
}
int
StreamEof(Stream * stream)
2023-03-15 02:40:23 +00:00
{
return stream && (stream->flags & STREAM_EOF);
}
2023-03-15 02:40:23 +00:00
int
StreamError(Stream * stream)
{
return stream && (stream->flags & STREAM_ERR);
2023-03-15 02:40:23 +00:00
}
void
StreamClearError(Stream * stream)
2023-03-15 02:40:23 +00:00
{
if (stream)
{
stream->flags &= ~STREAM_ERR;
}
}
2023-03-15 02:40:23 +00:00
int
StreamFlush(Stream * stream)
{
if (!stream)
2023-03-15 02:40:23 +00:00
{
errno = EBADF;
return EOF;
2023-03-15 02:40:23 +00:00
}
if (stream->wLen)
{
ssize_t writeRes = IoWrite(stream->io, stream->wBuf, stream->wLen);
2023-03-15 02:40:23 +00:00
if (writeRes == -1)
{
stream->flags |= STREAM_ERR;
return EOF;
}
2023-03-15 02:40:23 +00:00
stream->wLen = 0;
}
2023-03-15 02:40:23 +00:00
return 0;
2023-03-15 02:40:23 +00:00
}
ssize_t
StreamCopy(Stream * in, Stream * out)
{
ssize_t nBytes = 0;
int c;
int tries = 0;
int readFlg = 0;
while (1)
{
c = StreamGetc(in);
if (StreamEof(in))
{
break;
}
if (StreamError(in))
{
if (errno == EAGAIN)
{
StreamClearError(in);
tries++;
if (tries >= STREAM_RETRIES || readFlg)
{
break;
}
else
{
UtilSleepMillis(STREAM_DELAY);
continue;
}
}
else
{
break;
}
}
/* As soon as we've successfully read a byte, treat future
* EAGAINs as EOF, because somebody might have forgotten to
* close their stream. */
readFlg = 1;
tries = 0;
StreamPutc(out, c);
nBytes++;
}
StreamFlush(out);
return nBytes;
}