Improved:

- Performance improved
- Common logic is now used properly.
This commit is contained in:
vorotamoroz 2023-01-17 18:14:12 +09:00
parent a2c5f3b3f1
commit 6232c27cc3
5 changed files with 622 additions and 1864 deletions

2033
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -12,14 +12,14 @@
"license": "ISC",
"dependencies": {
"chokidar": "^3.5.3",
"pouchdb": "^7.2.2",
"pouchdb-adapter-http": "^7.2.2",
"pouchdb-adapter-leveldb": "^7.2.2",
"pouchdb-core": "^7.2.2",
"pouchdb-find": "^7.3.0",
"pouchdb-mapreduce": "^7.2.2",
"pouchdb-node": "^7.2.2",
"pouchdb-replication": "^7.2.2",
"pouchdb-adapter-http": "^8.0.0",
"pouchdb-adapter-idb": "^8.0.0",
"pouchdb-adapter-leveldb": "^8.0.0",
"pouchdb-core": "^8.0.0",
"pouchdb-find": "^8.0.0",
"pouchdb-mapreduce": "^8.0.0",
"pouchdb-replication": "^8.0.0",
"pouchdb-utils": "file:src/lib/src/patches/pouchdb-utils",
"transform-pouch": "^2.0.0",
"xxhash-wasm": "^1.0.1"
},
@ -32,4 +32,4 @@
"ts-node": "^10.5.0",
"typescript": "^4.5.5"
}
}
}

View File

@ -5,16 +5,21 @@ import * as util from "util";
import { exec } from "child_process";
import { Stats } from "fs";
import { Logger } from "./logger.js";
import { Logger } from "./lib/src/logger";
//@ts-ignore
import { PouchDB as PouchDB_src } from "./pouchdb.js";
import { configFile, connectConfig, eachConf, TransferEntry } from "./types.js";
import { addKnownFile, addTouchedFile, calcDateDiff, DATEDIFF_EVEN, DATEDIFF_NEWER_A, DATEDIFF_OLDER_A, isKnownFile, isTouchedFile, path2unix } from "./util.js";
import { enableEncryption, runWithLock, shouldSplitAsPlainText, splitPieces2, isPlainText } from "./lib/src/utils.js";
import { configFile, connectConfig, eachConf } from "./types.js";
import { addKnownFile, addTouchedFile, calcDateDiff, DATEDIFF_EVEN, DATEDIFF_NEWER_A, DATEDIFF_OLDER_A, isKnownFile, isPlainText, isTouchedFile, path2unix } from "./util.js";
import { EntryDoc, Entry, EntryLeaf, LoadedEntry, NewEntry, PlainEntry, LOG_LEVEL, MAX_DOC_SIZE, MAX_DOC_SIZE_BIN, } from "./lib/src/types.js";
import { DBFunctionEnvironment, getDBEntry, putDBEntry } from "./lib/src/LiveSyncDBFunctions.js";
import { LRUCache } from "./lib/src/LRUCache.js";
import { enableEncryption } from "./lib/src/utils_couchdb.js";
import { id2path_base, path2id_base } from "./lib/src/path.js";
import { getDocData, getDocDataAsArray } from "./lib/src/utils.js";
import { arrayBufferToBase64, base64ToArrayBuffer } from "./lib/src/strbin.js";
import { logStore } from "./lib/src/stores";
const xxhash = require("xxhash-wasm");
@ -35,6 +40,14 @@ function log(log: any) {
Logger(log, LOG_LEVEL.INFO);
}
logStore.subscribe((e) => {
const message = e.message;
const timestamp = new Date().toLocaleString();
const messagecontent = typeof message == "string" ? message : message instanceof Error ? `${message.name}:${message.message}` : JSON.stringify(message, null, 2);
const newmessage = timestamp + "->" + messagecontent;
console.log(newmessage);
});
function delay(ms: number): Promise<void> {
return new Promise((res) => setTimeout(() => res(), ms));
}
@ -84,160 +97,6 @@ function triggerProcessor(procs: string) {
}
const hashCaches = new LRUCache();
async function putDBEntry(note: LoadedEntry, passphrase: string, saveAsBigChunk: boolean, customChunkSize: number, database: PouchDB.Database<NewEntry | PlainEntry | EntryLeaf>) {
// let leftData = note.data;
const savenNotes = [];
let processed = 0;
let made = 0;
let skiped = 0;
const maxChunkSize = MAX_DOC_SIZE_BIN * Math.max(customChunkSize, 1);
let pieceSize = maxChunkSize;
let plainSplit = false;
let cacheUsed = 0;
const userpasswordHash = h32Raw(new TextEncoder().encode(passphrase));
if (!saveAsBigChunk && shouldSplitAsPlainText(note._id)) {
pieceSize = MAX_DOC_SIZE;
plainSplit = true;
}
const minimumChunkSize = Math.min(Math.max(40, ~~(note.data.length / 100)), maxChunkSize);
if (pieceSize < minimumChunkSize) pieceSize = minimumChunkSize;
const newLeafs: EntryLeaf[] = [];
const pieces = splitPieces2(note.data, pieceSize, plainSplit, minimumChunkSize, 0);
for (const piece of pieces()) {
processed++;
let leafid = "";
// Get hash of piece.
let hashedPiece = "";
let hashQ = 0; // if hash collided, **IF**, count it up.
let tryNextHash = false;
let needMake = true;
const cache = hashCaches.get(piece);
if (cache) {
hashedPiece = "";
leafid = cache;
needMake = false;
skiped++;
cacheUsed++;
} else {
if (passphrase != "") {
// When encryption has been enabled, make hash to be different between each passphrase to avoid inferring password.
hashedPiece = "+" + (h32Raw(new TextEncoder().encode(piece)) ^ userpasswordHash).toString(16);
} else {
hashedPiece = h32(piece);
}
leafid = "h:" + hashedPiece;
do {
let nleafid = leafid;
try {
nleafid = `${leafid}${hashQ}`;
const pieceData = await database.get<EntryLeaf>(nleafid);
if (pieceData.type == "leaf" && pieceData.data == piece) {
leafid = nleafid;
needMake = false;
tryNextHash = false;
hashCaches.set(piece, leafid);
} else if (pieceData.type == "leaf") {
Logger("hash:collision!!");
hashQ++;
tryNextHash = true;
} else {
leafid = nleafid;
tryNextHash = false;
}
} catch (ex: any) {
if (ex.status && ex.status == 404) {
//not found, we can use it.
leafid = nleafid;
needMake = true;
tryNextHash = false;
} else {
needMake = false;
tryNextHash = false;
throw ex;
}
}
} while (tryNextHash);
if (needMake) {
//have to make
const savePiece = piece;
const d: EntryLeaf = {
_id: leafid,
data: savePiece,
type: "leaf",
};
newLeafs.push(d);
hashCaches.set(piece, leafid);
made++;
} else {
skiped++;
}
}
savenNotes.push(leafid);
}
let saved = true;
if (newLeafs.length > 0) {
try {
const result = await database.bulkDocs(newLeafs);
for (const item of result) {
if (!(item as any).ok) {
if ((item as any).status && (item as any).status == 409) {
// conflicted, but it would be ok in childrens.
} else {
Logger(`Save failed:id:${item.id} rev:${item.rev}`, LOG_LEVEL.NOTICE);
Logger(item);
saved = false;
}
}
}
if (saved) {
Logger(`Chunk saved:${newLeafs.length} chunks`);
}
} catch (ex) {
Logger("Chunk save failed:", LOG_LEVEL.NOTICE);
Logger(ex, LOG_LEVEL.NOTICE);
saved = false;
}
}
if (saved) {
Logger(`note content saven, pieces:${processed} new:${made}, skip:${skiped}, cache:${cacheUsed}`);
const newDoc: PlainEntry | NewEntry = {
children: savenNotes,
_id: note._id,
ctime: note.ctime,
mtime: note.mtime,
size: note.size,
type: note.datatype,
};
// Here for upsert logic,
return await runWithLock("file:" + newDoc._id, false, async () => {
try {
const old = await database.get(newDoc._id) as EntryDoc;
if (!old.type || old.type == "notes" || old.type == "newnote" || old.type == "plain") {
// simple use rev for new doc
newDoc._rev = old._rev;
}
} catch (ex: any) {
if (ex.status && ex.status == 404) {
// NO OP/
} else {
throw ex;
}
}
const r = await database.put<PlainEntry | NewEntry>(newDoc, { force: true });
Logger(`note saved:${newDoc._id}:${r.rev}`);
return r;
});
} else {
Logger(`note coud not saved:${note._id}`);
return false;
}
}
// Run synchronization for each config
async function eachProc(syncKey: string, config: eachConf) {
log(`${syncKey} started`);
@ -253,7 +112,7 @@ async function eachProc(syncKey: string, config: eachConf) {
const remote = new PouchDB(serverURI, { auth: serverAuth });
if (serverAuth.passphrase != "") {
enableEncryption(remote as PouchDB.Database<EntryDoc>, serverAuth.passphrase);
enableEncryption(remote as PouchDB.Database<EntryDoc>, serverAuth.passphrase, false);
}
async function sanityCheck() {
@ -274,6 +133,86 @@ async function eachProc(syncKey: string, config: eachConf) {
process.exit(-1);
}
const storagePathRoot = path.resolve(exportPath);
let conf: connectConfig = {
syncKey: syncKey,
fromDB: remote,
fromPrefix: serverPath,
passphrase: serverAuth.passphrase,
deleteMetadataOfDeletedFiles: deleteMetadataOfDeletedFiles,
customChunkSize: customChunkSize
};
const env: DBFunctionEnvironment = {
localDatabase: remote as PouchDB.Database<NewEntry | PlainEntry | EntryLeaf>,
id2path: function (filename: string): string {
return id2path_base(filename);
},
path2id: function (filename: string): string {
return path2id_base(filename);
},
isTargetFile: function (file: string): boolean {
if (file.includes(":")) return false;
return true;
},
settings: {
customChunkSize: customChunkSize,
deleteMetadataOfDeletedFiles: false,
encrypt: conf.passphrase.trim() != "",
passphrase: conf.passphrase,
minimumChunkSize: 20,
readChunksOnline: true,
},
corruptedEntries: {},
CollectChunks: async function (ids: string[], showResult?: boolean, waitForReady?: boolean): Promise<false | EntryLeaf[]> {
const allDocs = async function* () {
const limit = Math.max(10, Math.min(4000 / (conf.customChunkSize), 25));
const reqIds = [...ids];
do {
try {
const reqKeys = reqIds.splice(0, limit);
// console.log(`requesting (${reqKeys.length} / ${reqIds.length})`);
const chunks = await env.localDatabase.allDocs({ keys: reqKeys, include_docs: true });
yield chunks.rows
} catch (ex) {
return;
}
} while (reqIds.length > 0);
return;
}();
const chunkDocs = [];
for await (const v of allDocs) {
chunkDocs.push(...v);
}
const ret = [];
if (chunkDocs.some(e => "error" in e)) {
const missingChunks = chunkDocs.filter(e => "error" in e).map(e => e.id).join(", ");
Logger(`Could not retrieve chunks. Chunks are missing:${missingChunks}`, LOG_LEVEL.NOTICE);
return false;
}
if (chunkDocs.some(e => e.doc && e.doc.type != "leaf")) {
const missingChunks = chunkDocs.filter(e => e.doc && e.doc.type != "leaf").map(e => e.id).join(", ");
Logger(`Could not retrieve chunks. corrupted chunks::${missingChunks}`, LOG_LEVEL.NOTICE);
return false;
}
return chunkDocs.map(e => e.doc as EntryLeaf);
// throw new Error("Not implemented and not be called");
},
getDBLeaf(id: string, waitForReady: boolean): Promise<string> {
throw new Error("Not implemented and not be called (GetDBLeaf)");
},
hashCaches: hashCaches,
h32: function (input: string, seed?: number): string {
return h32(input, seed);
},
h32Raw: function (input: Uint8Array, seed?: number): number {
return h32Raw(input, seed);
}
};
function openConnection(e: connectConfig, auto_reconnect: boolean) {
Logger(`Connecting ${e.syncKey} with auto_reconnect:${auto_reconnect}`);
e.fromDB
@ -288,7 +227,7 @@ async function eachProc(syncKey: string, config: eachConf) {
})
.on("change", async function (change) {
if (change.doc?._id.indexOf(":") == -1 && change.doc?._id.startsWith(e.fromPrefix) && isVaildDoc(change.doc._id)) {
let x = await transferDoc(e.syncKey, e.fromDB, change.doc, e.fromPrefix, e.passphrase, exportPath, deleteMetadataOfDeletedFiles);
let x = await transferDoc(e.syncKey, change.doc, e.fromPrefix, e.passphrase, exportPath, deleteMetadataOfDeletedFiles);
if (x) {
syncStat[syncKey] = change.seq + "";
triggerSaveStat();
@ -320,15 +259,6 @@ async function eachProc(syncKey: string, config: eachConf) {
log("start vault watching");
const storagePathRoot = path.resolve(exportPath);
let conf: connectConfig = {
syncKey: syncKey,
fromDB: remote,
fromPrefix: serverPath,
passphrase: serverAuth.passphrase,
deleteMetadataOfDeletedFiles: deleteMetadataOfDeletedFiles,
customChunkSize: customChunkSize
};
function storagePathToVaultPath(strStoragePath: string) {
const rel = path.relative(storagePathRoot, strStoragePath);
@ -357,11 +287,13 @@ async function eachProc(syncKey: string, config: eachConf) {
throw ex;
}
}
let content = "";
let content: string | string[] = "";
let datatype: "newnote" | "plain" = "newnote";
const d = await fs.readFile(pathSrc);
if (!isPlainText(pathSrc)) {
content = d.toString("base64");
const uint8arr = new Uint8Array(d.byteLength);
d.copy(uint8arr, 0, 0, d.byteLength);
content = await arrayBufferToBase64(uint8arr.buffer);
datatype = "newnote";
} else {
content = d.toString();
@ -377,7 +309,7 @@ async function eachProc(syncKey: string, config: eachConf) {
data: content,
type: datatype,
};
let ret = await putDBEntry(newNote, conf.passphrase, saveAsBigChunk, customChunkSize, remote as PouchDB.Database<NewEntry | PlainEntry | EntryLeaf>);
let ret = await putDBEntry(env, newNote, saveAsBigChunk);
if (ret) {
addTouchedFile(pathSrc, 0);
addKnownFile(conf.syncKey, ret.id, ret.rev);
@ -413,16 +345,62 @@ async function eachProc(syncKey: string, config: eachConf) {
return false;
}
}
async function pullFile(id: string, localPath: string, deleteMetadataOfDeletedFiles: boolean) {
let fromDoc = await remote.get(id);
async function pullFile(id: string, serverPath: string, localPath: string, deleteMetadataOfDeletedFiles: boolean) {
const fromDoc = await getDBEntry(env, env.path2id(id), undefined, false, false, true);
if (!fromDoc) {
log(`Failed to read file from database:${localPath}`);
return false;
}
const docName = fromDoc._id.substring(serverPath.length);
let sendDoc: PouchDB.Core.ExistingDocument<PouchDB.Core.ChangesMeta> & { children?: string[]; type?: string; mtime?: number } = { ...fromDoc, _id: docName.startsWith("_") ? "/" + docName : docName };
if (await exportDoc(sendDoc, docName, serverAuth.passphrase, remote, exportPath, deleteMetadataOfDeletedFiles)) {
log(`Pull:${localPath}`);
const sendDoc: LoadedEntry = {
...fromDoc,
_id: docName.startsWith("_") ? "/" + docName : docName
}
if (await exportDoc(env, sendDoc, docName, exportPath)) {
return true;
} else {
log(`Failed:${localPath}`);
return false;
}
}
async function transferDoc(syncKey: string, fromDoc: PouchDB.Core.ExistingDocument<PouchDB.Core.ChangesMeta>, fromPrefix: string, passphrase: string, exportPath: string, deleteMetadataOfDeletedFiles: boolean): Promise<boolean> {
const docKey = `${syncKey}: ${fromDoc._id} (${fromDoc._rev})`;
while (running[syncKey]) {
await delay(100);
}
try {
running[syncKey] = true;
if (isKnownFile(syncKey, fromDoc._id, fromDoc._rev)) {
return true;
}
log(`doc:${docKey} begin Transfer`);
let continue_count = 3;
try {
// const docName = fromDoc._id.substring(fromPrefix.length);
// let sendDoc: PouchDB.Core.ExistingDocument<PouchDB.Core.ChangesMeta> & { children?: string[]; type?: string; mtime?: number, deleted?: boolean } = { ...fromDoc, _id: docName.startsWith("_") ? "/" + docName : docName };
let retry = false;
do {
if (retry) {
continue_count--;
if (continue_count == 0) {
log(`doc:${docKey} retry failed`);
return false;
}
await delay(1500);
}
retry = !await pullFile(fromDoc._id, fromPrefix, exportPath, false);
} while (retry);
} catch (ex) {
log("Exception on transfer doc");
log(ex);
}
} finally {
running[syncKey] = false;
}
return false;
}
if (config.sync_on_connect || config.server.initialScan) {
const dbfiles = await remote.find({ limit: 999999999, selector: { $or: [{ type: "plain" }, { type: "newnote" }] }, fields: ["_id", "mtime"] });
@ -451,14 +429,14 @@ async function eachProc(syncKey: string, config: eachConf) {
// return;
} else if (diff == DATEDIFF_OLDER_A) {
log(`<-- ${localPath}`);
await pullFile(doc._id, localPath, deleteMetadataOfDeletedFiles);
await pullFile(doc._id, serverPath, localPath, deleteMetadataOfDeletedFiles);
} else {
log(`=== ${localPath}`);
}
} catch (ex: any) {
if (ex.code == "ENOENT") {
log(`<<- ${localPath}`);
await pullFile(doc._id, localPath, deleteMetadataOfDeletedFiles);
await pullFile(doc._id, serverPath, localPath, deleteMetadataOfDeletedFiles);
// return;
continue;
} else {
@ -517,26 +495,24 @@ async function eachProc(syncKey: string, config: eachConf) {
openConnection(conf, config.auto_reconnect ?? false);
}
async function getChildren(children: string[], db: PouchDB.Database) {
let items = await db.allDocs({ include_docs: true, keys: [...children] });
return items.rows.map((e) => e.doc);
}
function isVaildDoc(id: string): boolean {
if (id == "obsydian_livesync_version") return false;
if (id.indexOf(":") !== -1) return false;
return true;
}
async function exportDoc(sendDoc: TransferEntry, docName: string, passphrase: string, db: PouchDB.Database, exportPath: string, deleteMetadataOfDeletedFiles: boolean) {
async function exportDoc(env: DBFunctionEnvironment, sendDoc: LoadedEntry, docName: string, exportPath: string) {
const writePath = path.join(exportPath, docName);
if (sendDoc._deleted || sendDoc.deleted) {
log(`doc:${docName}: Deleted, so delete from ${writePath}`);
try {
addTouchedFile(writePath, 0);
await fs.unlink(writePath);
log(`doc:${docName}: Deleted, so delete from ${writePath}`);
} catch (ex: any) {
if (ex.code == "ENOENT") {
log(`doc:${docName}: Deleted, but already not exists on ${writePath}`);
//NO OP
} else {
throw ex;
@ -544,10 +520,7 @@ async function exportDoc(sendDoc: TransferEntry, docName: string, passphrase: st
}
return true;
}
if (!sendDoc.children) {
log(`doc:${docName}: Warning! document doesn't have chunks, skipped`);
return false;
}
try {
const stat_init = await fs.stat(writePath);
const mtime = sendDoc.mtime ?? new Date().getTime();
@ -562,86 +535,29 @@ async function exportDoc(sendDoc: TransferEntry, docName: string, passphrase: st
log(ex);
}
}
let cx = sendDoc.children;
let children = await getChildren(cx, db);
if (children.includes(undefined)) {
log(`doc:${docName}: Warning! there's missing chunks, skipped`);
return false;
}
children = children.filter((e) => !!e);
for (const v of children) {
delete (v as any)?._rev;
}
// let decrypted_children =
// passphrase == ""
// ? children
// : (
// await Promise.allSettled(
// children.map(async (e: any) => {
// e.data = await decrypt(e.data, passphrase);
// return e;
// })
// )
// ).map((e) => (e.status == "fulfilled" ? e.value : null));
const dirName = path.dirname(writePath);
log(`doc:${docName}: Exporting to ${writePath}`);
await fs.mkdir(dirName, { recursive: true });
const dt_plain = children.map((e: any) => e.data).join("");
const mtime = sendDoc.mtime ?? new Date().getTime();
addTouchedFile(writePath, mtime);
const tmtime = ~~(mtime / 1000);
if (sendDoc.type == "plain") {
await fs.writeFile(writePath, dt_plain);
await fs.writeFile(writePath, getDocData(sendDoc.data));
await fs.utimes(writePath, tmtime, tmtime);
} else {
const dt_bin = Buffer.from(dt_plain, "base64");
const buf = base64ToArrayBuffer(sendDoc.data)
const dt_bin = Buffer.from(buf) //new DataView(base64ToArrayBuffer(sendDoc.data));
await fs.writeFile(writePath, dt_bin, { encoding: "binary" });
await fs.utimes(writePath, tmtime, tmtime);
}
log(`doc:${docName}: Exported`);
return true;
}
async function transferDoc(syncKey: string, fromDB: PouchDB.Database, fromDoc: PouchDB.Core.ExistingDocument<PouchDB.Core.ChangesMeta>, fromPrefix: string, passphrase: string, exportPath: string, deleteMetadataOfDeletedFiles: boolean): Promise<boolean> {
const docKey = `${syncKey}: ${fromDoc._id} (${fromDoc._rev})`;
while (running[syncKey]) {
await delay(100);
}
try {
running[syncKey] = true;
if (isKnownFile(syncKey, fromDoc._id, fromDoc._rev)) {
return true;
}
log(`doc:${docKey} begin Transfer`);
let continue_count = 3;
try {
const docName = fromDoc._id.substring(fromPrefix.length);
let sendDoc: PouchDB.Core.ExistingDocument<PouchDB.Core.ChangesMeta> & { children?: string[]; type?: string; mtime?: number, deleted?: boolean } = { ...fromDoc, _id: docName.startsWith("_") ? "/" + docName : docName };
let retry = false;
do {
if (retry) {
continue_count--;
if (continue_count == 0) {
log(`doc:${docKey} retry failed`);
return false;
}
await delay(1500);
}
retry = !(await exportDoc(sendDoc, docName, passphrase, fromDB, exportPath, deleteMetadataOfDeletedFiles));
} while (retry);
} catch (ex) {
log("Exception on transfer doc");
log(ex);
}
} finally {
running[syncKey] = false;
}
return false;
}
async function main() {
log("FileSystem-Livesync starting up.");

@ -1 +1 @@
Subproject commit 85bb3556ba90c054d0245ece5bf02bbe51f762a2
Subproject commit 133bae360798ee1513082cf728a59d96392bae6f

View File

@ -1,13 +0,0 @@
import { LOG_LEVEL } from "./types";
// eslint-disable-next-line require-await
export let Logger: (message: any, level?: LOG_LEVEL) => void = (message, _) => {
const timestamp = new Date().toLocaleString();
const messagecontent = typeof message == "string" ? message : message instanceof Error ? `${message.name}:${message.message}` : JSON.stringify(message, null, 2);
const newmessage = timestamp + "->" + messagecontent;
console.log(newmessage);
};
export function setLogger(loggerFun: (message: any, level?: LOG_LEVEL) => Promise<void>) {
Logger = loggerFun;
}