Skip to content

Commit

Permalink
feat: core and basic modules
Browse files Browse the repository at this point in the history
  • Loading branch information
willfarrell committed Jul 24, 2022
1 parent e753856 commit 48cb52c
Show file tree
Hide file tree
Showing 30 changed files with 1,704 additions and 1 deletion.
12 changes: 12 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
*.cjs
*.cjs.map
*.mjs
*.mjs.map

# Logs
logs
*.log
Expand Down Expand Up @@ -102,3 +107,10 @@ dist

# TernJS port file
.tern-port

# IDE
.idea
*.iml

# OS
.DS_Store
4 changes: 4 additions & 0 deletions .husky/commit-msg
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/sh
. "$(dirname "$0")/_/husky.sh"

npm run commit-msg
4 changes: 4 additions & 0 deletions .husky/pre-commit
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/sh
. "$(dirname "$0")/_/husky.sh"

npm run pre-commit
36 changes: 35 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,36 @@
# datastream
Common used stream patterns for Web Streams API and NodeJS Stream.
Commonly used stream patterns for Web Streams API and NodeJS Stream.
- `@datastream/core`
- pipeline
- pipejoin
- streamToArray
- streamToString
- isReadable
- isWritable
- makeOptions
- createReadableStream
- createTransformStream
- createWritableStream

## Streams

- Readable: The start of a pipeline of streams that injects data into a stream.
- PassThrough: Does not modify the data, but listens to the data and prepares a result that can be retrieved.
- Transform: Modifies data as it passes through.
- Writable: The end of a pipeline of streams that stores data from the stream.

### Basics
- `@datastream/string`
- stringReadableStream [Readable]
- stringLengthStream [PassThrough]
- stringOutputStream [PassThrough]
- `@datastream/object`
- objectReadableStream [Readable]
- objectCountStream [PassThrough]
- objectBatchStream [Transform]
- objectOutputStream [PassThrough]

### Common
- `@datastream/digest`
- digestStream [PassThrough]

24 changes: 24 additions & 0 deletions bin/esbuild
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/usr/bin/env sh

for package in packages/*; do
modules=$(jq -r '.exports | keys[] as $k | "\($k)"' ${package}/package.json)
#length=${#modules[@]}
for module in $modules; do
if [ "${module}" == "." ]; then
module=index
fi
if [ -f ${package}/${module}.js ]; then
node_modules/.bin/esbuild --platform=node --target=node18 --format=cjs ${package}/${module}.js --sourcemap=external --allow-overwrite --outfile=${package}/${module}.node.cjs
node_modules/.bin/esbuild --platform=node --target=node18 --format=esm ${package}/${module}.js --sourcemap=external --allow-overwrite --outfile=${package}/${module}.node.mjs
node_modules/.bin/esbuild --platform=browser --format=esm ${package}/${module}.js --sourcemap=external --allow-overwrite --outfile=${package}/${module}.web.mjs
fi
if [ -f ${package}/${module}.node.js ]; then
node_modules/.bin/esbuild --platform=node --target=node18 --format=cjs ${package}/${module}.node.js --sourcemap=external --allow-overwrite --outfile=${package}/${module}.node.cjs
node_modules/.bin/esbuild --platform=node --target=node18 --format=esm ${package}/${module}.node.js --sourcemap=external --allow-overwrite --outfile=${package}/${module}.node.mjs
fi
if [ -f ${package}/${module}.web.js ]; then
node_modules/.bin/esbuild --platform=browser --format=esm ${package}/${module}.web.js --sourcemap=external --allow-overwrite --outfile=${package}/${module}.web.mjs
fi

done
done
7 changes: 7 additions & 0 deletions lerna.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"packages": [
"packages/*"
],
"useNx": false,
"version": "0.0.0"
}
4 changes: 4 additions & 0 deletions lint-staged.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export default {
'**/*.{json,yml}': ['prettier --write'],
'**/*.js': ['prettier --write', 'standard --fix']
}
100 changes: 100 additions & 0 deletions packages/core/index.node.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import { Readable, Transform, PassThrough, Writable } from 'node:stream';
import { pipeline as pipelinePromise } from 'node:stream/promises';

export const pipeline = async (streams, { signal } = {}) => {

// Ensure stream ends with only writable
const lastStream = streams[streams.length - 1]
if (isReadable(lastStream)) {
streams.push(createWritableStream(() => {}, { signal, objectMode: lastStream._readableState.objectMode }))
}

await pipelinePromise(streams, { signal })

const output = {}
for (const stream of streams) {
if (typeof stream.result === 'function') {
const { key, value } = await stream.result()
output[key] = value
}
}

return output
}

export const pipejoin = (streams) => {
return streams.reduce((pipeline, stream, idx) => {
return pipeline.pipe(stream)
})
}

export const streamToArray = async (stream) => {
let value = []
for await (const chunk of stream) {
value.push(chunk)
}
return value
}

export const streamToString = async (stream) => {
let value = ''
for await (const chunk of stream) {
value += chunk
}
return value
}

/*export const streamToBuffer = async (stream) => {
let value = []
for await (const chunk of stream) {
value.push(Buffer.from(chunk))
}
return Buffer.concat(value)
}*/

export const isReadable = (stream) => {
return !!stream._readableState
}

export const isWritable = (stream) => {
return !!stream._writableState
}

export const makeOptions = ({highWaterMark, chunkSize, objectMode, ...options} = {}) => {
return {
writableHighWaterMark: highWaterMark,
writableObjectMode: objectMode,
readableObjectMode: objectMode,
readableHighWaterMark: highWaterMark,
highWaterMark,
chunkSize,
objectMode,
...options
}
}

export const createReadableStream = (data, options) => {
return Readable.from(data, options)
}

export const createTransformStream = (fn = () => {}, options) => {
return new Transform({
...makeOptions({objectMode: true, ...options}),
transform(chunk, encoding, callback) {
fn(chunk)
this.push(chunk)
callback()
}
})
}

export const createWritableStream = (fn = () => {}, options) => {
return new Writable({
objectMode: true,
...options,
write(chunk, encoding, callback) {
fn(chunk)
callback()
}
})
}
Loading

0 comments on commit 48cb52c

Please sign in to comment.