Own transaction when raising an event

This commit is contained in:
Uwe Klinger
2021-05-03 16:37:16 +02:00
parent eb4bc703dd
commit 98113c46fd
3 changed files with 35 additions and 14 deletions

View File

@@ -84,3 +84,9 @@ In case you've a question, find a bug, or otherwise need support, use our [commu
## License
Copyright (c) 2021 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, version 2.0 except as noted otherwise in the [LICENSE](LICENSE.txt) file.
## TODOs
1. Move `Suppliers` out of `API_BUSINESS_PARTNER` - remove additional lines in monkey patch --> Uwe
2. Fix problem with `await S4bupa.read (Suppliers).where('ID in',IDs)` --> Johannes
3.

View File

@@ -1,8 +1,9 @@
@server = http://localhost:4004
@bpServer = http://localhost:59847
@authAlice = Authorization: Basic alice:
PUT {{server}}/api-business-partner/A_BusinessPartner('ACME')
PATCH {{server}}/api-business-partner/A_BusinessPartner('ACME')
Content-Type: application/json
{

View File

@@ -26,26 +26,26 @@ module.exports = async()=>{ // called by server.js
admin.on (['CREATE','UPDATE'], 'Books', async ({data:{supplier_ID: supplierId}}, next) => {
// Using Promise.all(...) to parallelize local write, i.e. next(), and replication
/*
// ERROR: Reference integrity is violated for association "supplier"
if (supplierId) return Promise.all ([ next(), async()=>{
let replicated = await db.exists (Suppliers, supplierId)
if (!replicated) await replicate (supplierId, 'initial')
}])
else return next() //> don't forget to pass down the interceptor stack
*/
const replicateIfNotExists = async()=>{
let replicated = await db.exists (Suppliers, supplierId);
if (!replicated) await replicate (supplierId, 'initial');
};
let replicated = await db.exists (Suppliers, supplierId);
if (!replicated) await replicate (supplierId, 'initial');
return next();
if (supplierId) return Promise.all ([ next(), replicateIfNotExists() ])
else return next() //> don't forget to pass down the interceptor stack
})
})
// Subscribe to changes in the S4 origin of Suppliers data
// REVISIT: cds context is still from the UPDAT method when running in same programm, but should
// be a separate
S4bupa.on ('BusinessPartners/Changed', async msg => { //> would be great if we had batch events from S/4
let replicas = await SELECT('ID').from (Suppliers) .where ('ID in', msg.businessPartners)
return replicate (replicas.map(each => each.ID))
await new Promise( resolve => setTimeout( resolve, 1000 ));
const tx = cds.db.tx(msg);
let replicas = await tx.run(SELECT('ID').from (Suppliers) .where ('ID in', msg.data.businessPartners));
await replicateTx(replicas.map(each => each.ID), undefined, tx, msg);
await tx.commit();
})
/**
@@ -57,6 +57,7 @@ module.exports = async()=>{ // called by server.js
if (!Array.isArray(IDs)) IDs = [ IDs ]
// TODO: Doesn't work when running in same process with mocked API_BUSINESS_PARTNER
// TODO: Issue
let suppliers = await S4bupa.read (Suppliers).where(...([[]].concat(IDs).reduce( (where, id, index ) => { where.push(`${index>1 ? "OR ":""}ID = `, id); return where })));
//let suppliers = await S4bupa.read (Suppliers).where('ID in',IDs)
if (_initial) return db.insert (suppliers) .into (Suppliers) //> using bulk insert
@@ -65,4 +66,17 @@ module.exports = async()=>{ // called by server.js
))
}
async function replicateTx (IDs,_initial, tx, msg) {
if (!Array.isArray(IDs)) IDs = [ IDs ]
// TODO: Doesn't work when running in same process with mocked API_BUSINESS_PARTNER
// TODO: Issue
let suppliers = await S4bupa.tx(msg).read (Suppliers).where(...([[]].concat(IDs).reduce( (where, id, index ) => { where.push(`${index>1 ? "OR ":""}ID = `, id); return where })));
//let suppliers = await S4bupa.read (Suppliers).where('ID in',IDs)
if (_initial) return tx.insert (suppliers) .into (Suppliers) //> using bulk insert
else return Promise.all(suppliers.map ( //> parallelizing updates
each => tx.update (Suppliers,each.ID) .with (each)
))
}
}