1 module easyd.stream; 2 3 // (C) 2014-2019 by Matthias Rossmy 4 // This file is distributed under the "Fair Use License v2" 5 6 import std.stdio; 7 import std.traits; 8 import std.conv; 9 import std.file; 10 import std.path; 11 import std.algorithm.comparison; 12 13 import easyd.base; 14 import easyd.string; 15 16 unittest 17 { 18 auto stream = new BufferStream; 19 int x=123; 20 stream.write(x); 21 assert(stream.unreadBytes==4); 22 assert(stream.read!int==123); 23 assert(stream.unreadBytes==0); 24 } 25 26 interface IReadStream 27 { 28 bool readBuf(ref Byte[] buf, ulong maxSize); //return false, if end of stream is reached 29 } 30 31 bool isAtEnd(IReadStream s) 32 { 33 Byte[] buf; 34 return !s.readBuf(buf,0); 35 } 36 37 T read(T)(IReadStream s) 38 if(isBasicType!T) 39 { 40 Byte[] buf; 41 if(s.readBuf(buf,T.sizeof)) 42 { 43 return buf.readFromPos!T(0); 44 } 45 else 46 { 47 throw new Exception("Read from stream failed"); 48 } 49 } 50 51 T read(T)(IReadStream s, ulong bufsize=0) 52 if(is(T:string)) 53 { 54 ubyte[] ba; 55 ba.length = bufsize; 56 s.read(ba,bufsize==0); 57 return cast(string)(ba.idup); 58 } 59 60 T read(T)(IReadStream s, ulong bufsize=0) 61 if(is(T:Byte[])) 62 { 63 ubyte[] ba; 64 ba.length = bufsize; 65 s.read(ba,bufsize==0); 66 return ba; 67 } 68 69 ulong readVarBits(IReadStream s) 70 { 71 ubyte b = s.read!ubyte; 72 if(b<255) 73 { 74 return b; 75 }else{ 76 return s.read!ulong; 77 } 78 } 79 80 bool read(T)(IReadStream s, ref T[] buffer, bool evalLength) 81 { 82 if(evalLength) buffer.length = s.readVarBits; 83 try 84 { 85 //std.stdio.writeln("Reading array of "~buffer.length.to!string~" elements"); 86 static if(is(T:ubyte)) 87 { 88 return s.readBuf(buffer,buffer.length); 89 }else{ 90 foreach(ref item;buffer) 91 { 92 item = s.read!(typeof(item)); 93 } 94 return true; 95 } 96 } 97 catch(Exception e) 98 { 99 return false; 100 } 101 } 102 103 string readLine(IReadStream s, bool includeNewLine=false, Byte[] bom=[239,187,191]) 104 { 105 char[] a; 106 char c; 107 while(true) 108 { 109 try 110 { 111 c = s.read!char; 112 if(c=='\n') 113 { 114 if(includeNewLine) a ~= c; 115 break; 116 } 117 a ~= c; 118 }catch(Exception e){ 119 break; 120 } 121 } 122 for(int x=0; x<bom.length && x<a.length; x++) if(a[x].to!Byte!=bom[x]) return a.idup; 123 return a.idup.subStr(bom.length); 124 } 125 126 abstract class IWriteStream 127 { 128 bool writeBuf(const Byte[] buf); 129 void finish(){} 130 131 void write(T)(T data) 132 if(isBasicType!T) 133 { 134 Byte[] buf; 135 buf.length = T.sizeof; 136 buf.writeToPos(0,data); 137 writeBuf(buf); 138 } 139 140 void writeVarBits(ulong x) 141 { 142 if(x<255) 143 { 144 write(cast(ubyte)x); 145 }else{ 146 ubyte m = 255; 147 write(m); 148 write(x); 149 } 150 } 151 152 void write(T)(const T[] data, bool includeLength) 153 { 154 //std.stdio.writeln("IWriteStream.write"); 155 if(includeLength) writeVarBits(data.length); 156 static if(is(T:ubyte) || is(T:char)) 157 { 158 writeBuf(cast(ubyte[])data); 159 }else{ 160 foreach(item;data) 161 { 162 write(item); 163 } 164 } 165 } 166 167 void writeLine(string s) 168 { 169 write(s,false); 170 write('\n'); 171 } 172 } 173 174 abstract class IReadWriteStream : IWriteStream, IReadStream 175 {} 176 177 interface ISeekableStream 178 { 179 void seekRead(ulong pos); 180 void seekWrite(ulong pos); 181 } 182 183 abstract class ISeekableReadWriteStream : IReadWriteStream, ISeekableStream 184 {} 185 186 class RamStream : ISeekableReadWriteStream 187 { 188 Byte[] data; 189 ulong readPos=0; 190 ulong writePos=0; 191 192 bool readBuf(ref Byte[] buf, ulong maxSize) 193 { 194 buf.length = min(maxSize, data.length-readPos); 195 for(uint x=0; x<buf.length; x++) 196 { 197 buf[x] = data[readPos++]; 198 } 199 return buf.length>0 || readPos<data.length; 200 } 201 202 void seekRead(ulong pos) 203 { 204 readPos = pos; 205 } 206 207 override bool writeBuf(const Byte[] buf) 208 { 209 if(writePos==data.length) 210 { 211 data ~= buf; 212 writePos += buf.length; 213 }else{ 214 ulong neededlength = writePos + buf.length; 215 if(data.length<neededlength) data.length=neededlength; 216 for(uint x=0; x<buf.length; x++) 217 { 218 data[writePos++] = buf[x]; 219 } 220 } 221 return true; 222 } 223 224 void seekWrite(ulong pos) 225 { 226 writePos = pos; 227 } 228 229 void reset() 230 { 231 data.length=0; 232 writePos=0; 233 readPos=0; 234 } 235 } 236 237 RamStream toStream(Byte[] a) 238 { 239 auto result = new RamStream; 240 result.data = a; 241 return result; 242 } 243 244 class BufferStream : IReadWriteStream 245 { 246 bool finished=false; 247 protected Byte[] data; 248 249 void peek(ref Byte[] buf, ulong maxSize) 250 { 251 buf.length = min(maxSize,unreadBytes); 252 for(int x=0; x<buf.length; x++) 253 { 254 buf[x] = data[x]; 255 } 256 } 257 258 bool readBuf(ref Byte[] buf, ulong maxSize) 259 { 260 peek(buf,maxSize); 261 if(buf.length>0) 262 { 263 for(int x=0; x<(data.length-buf.length); x++) 264 { 265 data[x] = data[x+buf.length]; 266 } 267 data.length = data.length - buf.length; 268 } 269 //std.stdio.writeln("BufferStream readBuf done, unreadBytes=",unreadBytes); 270 return buf.length>0 || unreadBytes>0 || !finished; 271 } 272 273 override bool writeBuf(const Byte[] buf) 274 { 275 data ~= buf; 276 //std.stdio.writeln("BufferStream writeBuf done, unreadBytes=",unreadBytes); 277 return true; 278 } 279 280 override void finish() 281 { 282 finished = true; 283 } 284 285 ulong unreadBytes() 286 { 287 return data.length; 288 } 289 290 void reset() 291 { 292 data.length=0; 293 finished=false; 294 } 295 } 296 297 class FileReader : IReadStream, ISeekableStream 298 { 299 File file; 300 301 this(string fileName, bool allowStdIn=false) 302 { 303 file = File(fileName,"r"); 304 } 305 306 protected this(){} 307 308 bool readBuf(ref Byte[] buf, ulong maxSize) 309 { 310 buf.length = maxSize; 311 if(maxSize>0) 312 { 313 auto slice = file.rawRead(buf); 314 buf.length = slice.length; 315 } 316 //writeln(buf.length," ",file.eof); 317 return buf.length>0 || !file.eof; 318 } 319 320 void seekRead(ulong pos) 321 { 322 file.seek(pos); 323 } 324 325 void seekWrite(ulong pos) 326 { 327 throw new Exception("FileReader does not support writing"); 328 } 329 } 330 331 class FileWriter : IWriteStream, ISeekableStream 332 { 333 protected File file; 334 335 this(string fileName, bool append=false, bool allowStdOut=false) 336 { 337 if(allowStdOut && fileName=="-") 338 { 339 file = stdout; 340 }else{ 341 mkdirRecurse(fileName.dirName); 342 file = File(fileName,append?"a":"w"); 343 } 344 } 345 346 protected this(){} 347 348 override bool writeBuf(const Byte[] buf) 349 { 350 return Try(file.rawWrite(buf)); 351 } 352 353 override void finish() 354 { 355 file.close; 356 } 357 358 void seekWrite(ulong pos) 359 { 360 file.seek(pos); 361 } 362 363 void seekRead(ulong pos) 364 { 365 throw new Exception("FileWriter does not support reading"); 366 } 367 } 368