Skip to content

Commit

Permalink
Changes done for v4.1.4.
Browse files Browse the repository at this point in the history
  • Loading branch information
Senthil Nathan committed Apr 6, 2022
1 parent 8be0a5a commit 6053e90
Show file tree
Hide file tree
Showing 25 changed files with 527 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,14 @@ it does safety checks and is therefore slower.</description>
</description>
<prototype>public stateful void dpsEndIteration(uint64 store, uint64 iterator, mutable uint64 err)</prototype>
</function>
<function>
<description>This function can be called to get all the keys present in a given store.
@param store the handle of the store.
@param keys a user provided mutable list variable. This list must be suitable for storing the data type of the keys.
@param err Contains the error code. Will be '0' if no error occurs, and a non-zero value otherwise.
</description>
<prototype>&lt;any T1> public stateful void dpsGetAllKeys(uint64 store, mutable list&lt;T1&gt; keys, mutable uint64 err)</prototype>
</function>
<function>
<description> This function serializes all the key-value pairs in a given store id into a blob. The blob can be used to recreate all the key-value pairs into another store. This is a useful technique for copying an entire store into a different store. See `dpsDeserialize()` for a detailed example on how to use the serialization functions to copy a store.
@param store the handle of the store to serialize.
Expand Down
3 changes: 2 additions & 1 deletion com.ibm.streamsx.dps/impl/include/CassandraDBLayer.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
# Licensed Materials - Property of IBM
# Copyright IBM Corp. 2011, 2014
# Copyright IBM Corp. 2011, 2022
# US Government Users Restricted Rights - Use, duplication or
# disclosure restricted by GSA ADP Schedule Contract with
# IBM Corp.
Expand Down Expand Up @@ -158,6 +158,7 @@ namespace distributed
bool acquireLock(uint64_t lock, double leaseTime, double maxWaitTimeToAcquireLock, PersistenceError & lkError);
bool removeLock(uint64_t lock, PersistenceError & lkError);
uint32_t getPidForLock(std::string const & name, PersistenceError & lkError);
void getAllKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, PersistenceError & dbError);

};
} } } } }
Expand Down
3 changes: 2 additions & 1 deletion com.ibm.streamsx.dps/impl/include/CloudantDBLayer.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
# Licensed Materials - Property of IBM
# Copyright IBM Corp. 2011, 2014
# Copyright IBM Corp. 2011, 2022
# US Government Users Restricted Rights - Use, duplication or
# disclosure restricted by GSA ADP Schedule Contract with
# IBM Corp.
Expand Down Expand Up @@ -197,6 +197,7 @@ namespace distributed
bool acquireLock(uint64_t lock, double leaseTime, double maxWaitTimeToAcquireLock, PersistenceError & lkError);
bool removeLock(uint64_t lock, PersistenceError & lkError);
uint32_t getPidForLock(std::string const & name, PersistenceError & lkError);
void getAllKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, PersistenceError & dbError);

};
} } } } }
Expand Down
3 changes: 2 additions & 1 deletion com.ibm.streamsx.dps/impl/include/CouchbaseDBLayer.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
# Licensed Materials - Property of IBM
# Copyright IBM Corp. 2011, 2015
# Copyright IBM Corp. 2011, 2022
# US Government Users Restricted Rights - Use, duplication or
# disclosure restricted by GSA ADP Schedule Contract with
# IBM Corp.
Expand Down Expand Up @@ -203,6 +203,7 @@ namespace distributed
bool acquireLock(uint64_t lock, double leaseTime, double maxWaitTimeToAcquireLock, PersistenceError & lkError);
bool removeLock(uint64_t lock, PersistenceError & lkError);
uint32_t getPidForLock(std::string const & name, PersistenceError & lkError);
void getAllKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, PersistenceError & dbError);

};
} } } } }
Expand Down
4 changes: 4 additions & 0 deletions com.ibm.streamsx.dps/impl/include/DBLayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ namespace store {
/// @return true if connection is active or false if connection is inactive.
virtual bool reconnect(std::set<std::string> & dbServers, PersistenceError & dbError) = 0;

/// Get all the keys present in a given store.
/// @return a list containing all the keys of a given data type.
virtual void getAllKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, PersistenceError & dbError) = 0;

/// A store iterator
class Iterator
{
Expand Down
55 changes: 54 additions & 1 deletion com.ibm.streamsx.dps/impl/include/DistributedProcessStore.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
# Licensed Materials - Property of IBM
# Copyright IBM Corp. 2011, 2014
# Copyright IBM Corp. 2011, 2022
# US Government Users Restricted Rights - Use, duplication or
# disclosure restricted by GSA ADP Schedule Contract with
# IBM Corp.
Expand Down Expand Up @@ -218,6 +218,13 @@ namespace distributed
/// @param err PersistentStore error code
void endIteration(SPL::uint64 store, SPL::uint64 iterator, SPL::uint64 & err);

/// Get all the keys in a given store.
/// @param store store handle
/// @param List (vector) of a specific key type
/// @param err store error code
template<class T1>
void getAllKeysHelper(SPL::uint64 store, SPL::list<T1> & keys, SPL::uint64 & err);

/// Serialize the items from the serialized store
/// @param store store handle
/// @param data blob to serialize into
Expand Down Expand Up @@ -858,6 +865,52 @@ namespace distributed
return res;
}

/// Get all the keys in a given store.
/// @param store store handle
/// @param List (vector) of a specific key type
/// @param err store error code
template<class T1>
void DistributedProcessStore::getAllKeysHelper(SPL::uint64 store, SPL::list<T1> & keys, SPL::uint64 & err) {
dbError_->reset();
std::vector<unsigned char *> keysBuffer;
std::vector<uint32_t> keysSize;
// Call the underlying store implementation function to get all the keys in a given store.
db_->getAllKeys(store, keysBuffer, keysSize, *dbError_);
err = dbError_->getErrorCode();

if(err != 0) {
// We got an error. Return now without populating anything in the
// user provided list (vector). We will free any memory allocated
// for the partial set of keys returned if any.
for (unsigned int i = 0; i < keysBuffer.size(); i++) {
if(keysSize.at(i) > 0) {
// We must free the buffer allocated by the underlying implementation that we called above.
free(keysBuffer.at(i));
}
} // End of for loop.

return;
}

// We got all the keys.
// Let us convert it to the proper key type and store them in
// the user provided list (vector).
keys.clear();

// Populate the user provided list (vector) with the store keys.
for (unsigned int i = 0; i < keysBuffer.size(); i++) {
SPL::NativeByteBuffer nbf_key(keysBuffer.at(i), keysSize.at(i));
T1 tempKey;
nbf_key >> tempKey;
keys.push_back(tempKey);

if(keysSize.at(i) > 0) {
// We must free the buffer allocated by the underlying implementation that we called above.
free(keysBuffer.at(i));
}
} // End of for loop.
}

template<class T1, class T2>
void DistributedProcessStore::serialize(SPL::uint64 store, SPL::blob & data, SPL::uint64 & err)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
# Licensed Materials - Property of IBM
# Copyright IBM Corp. 2011, 2014
# Copyright IBM Corp. 2011, 2022
# US Government Users Restricted Rights - Use, duplication or
# disclosure restricted by GSA ADP Schedule Contract with
# IBM Corp.
Expand Down Expand Up @@ -299,7 +299,17 @@ namespace distributed
{
return DistributedProcessStore::getGlobalStore().endIteration(store, iterator, err);
}


/// Get all the keys in a given store.
/// @param store store handle
/// @param List (vector) of a specific key type
/// @param err store error code
template<class T1>
void dpsGetAllKeys(SPL::uint64 store, SPL::list<T1> & keys, SPL::uint64 & err)
{
return DistributedProcessStore::getGlobalStore().getAllKeysHelper(store, keys, err);
}

/// Serialize the items from the serialized store
/// @param store store handle
/// @param data blob to serialize into
Expand Down
3 changes: 2 additions & 1 deletion com.ibm.streamsx.dps/impl/include/HBaseDBLayer.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
# Licensed Materials - Property of IBM
# Copyright IBM Corp. 2011, 2014
# Copyright IBM Corp. 2011, 2022
# US Government Users Restricted Rights - Use, duplication or
# disclosure restricted by GSA ADP Schedule Contract with
# IBM Corp.
Expand Down Expand Up @@ -212,6 +212,7 @@ namespace distributed
bool acquireLock(uint64_t lock, double leaseTime, double maxWaitTimeToAcquireLock, PersistenceError & lkError);
bool removeLock(uint64_t lock, PersistenceError & lkError);
uint32_t getPidForLock(std::string const & name, PersistenceError & lkError);
void getAllKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, PersistenceError & dbError);

};
} } } } }
Expand Down
3 changes: 2 additions & 1 deletion com.ibm.streamsx.dps/impl/include/MemcachedDBLayer.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
# Licensed Materials - Property of IBM
# Copyright IBM Corp. 2011, 2014
# Copyright IBM Corp. 2011, 2022
# US Government Users Restricted Rights - Use, duplication or
# disclosure restricted by GSA ADP Schedule Contract with
# IBM Corp.
Expand Down Expand Up @@ -178,6 +178,7 @@ namespace distributed
bool acquireLock(uint64_t lock, double leaseTime, double maxWaitTimeToAcquireLock, PersistenceError & lkError);
bool removeLock(uint64_t lock, PersistenceError & lkError);
uint32_t getPidForLock(std::string const & name, PersistenceError & lkError);
void getAllKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, PersistenceError & dbError);

};
} } } } }
Expand Down
3 changes: 2 additions & 1 deletion com.ibm.streamsx.dps/impl/include/MongoDBLayer.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
# Licensed Materials - Property of IBM
# Copyright IBM Corp. 2011, 2014
# Copyright IBM Corp. 2011, 2022
# US Government Users Restricted Rights - Use, duplication or
# disclosure restricted by GSA ADP Schedule Contract with
# IBM Corp.
Expand Down Expand Up @@ -172,6 +172,7 @@ namespace distributed
bool acquireLock(uint64_t lock, double leaseTime, double maxWaitTimeToAcquireLock, PersistenceError & lkError);
bool removeLock(uint64_t lock, PersistenceError & lkError);
uint32_t getPidForLock(std::string const & name, PersistenceError & lkError);
void getAllKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, PersistenceError & dbError);

};
} } } } }
Expand Down
3 changes: 2 additions & 1 deletion com.ibm.streamsx.dps/impl/include/RedisClusterDBLayer.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
# Licensed Materials - Property of IBM
# Copyright IBM Corp. 2011, 2015
# Copyright IBM Corp. 2011, 2022
# US Government Users Restricted Rights - Use, duplication or
# disclosure restricted by GSA ADP Schedule Contract with
# IBM Corp.
Expand Down Expand Up @@ -181,6 +181,7 @@ namespace distributed
std::string const & baseUrl, std::string const & apiEndpoint, std::string const & queryParams,
std::string const & jsonRequest, std::string & jsonResponse, PersistenceError & dbError);
bool runDataStoreCommand(std::vector<std::string> const & cmdList, std::string & resultValue, PersistenceError & dbError);
void getAllKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, PersistenceError & dbError);

// Lock related methods.
uint64_t createOrGetLock(std::string const & name, PersistenceError & lkError);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
# Licensed Materials - Property of IBM
# Copyright IBM Corp. 2011, 2020
# Copyright IBM Corp. 2011, 2022
# US Government Users Restricted Rights - Use, duplication or
# disclosure restricted by GSA ADP Schedule Contract with
# IBM Corp.
Expand Down Expand Up @@ -156,6 +156,7 @@ namespace distributed
std::string const & baseUrl, std::string const & apiEndpoint, std::string const & queryParams,
std::string const & jsonRequest, std::string & jsonResponse, PersistenceError & dbError);
bool runDataStoreCommand(std::vector<std::string> const & cmdList, std::string & resultValue, PersistenceError & dbError);
void getAllKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, PersistenceError & dbError);

// Lock related methods.
uint64_t createOrGetLock(std::string const & name, PersistenceError & lkError);
Expand Down
4 changes: 2 additions & 2 deletions com.ibm.streamsx.dps/impl/include/RedisDBLayer.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
# Licensed Materials - Property of IBM
# Copyright IBM Corp. 2011, 2020
# Copyright IBM Corp. 2011, 2022
# US Government Users Restricted Rights - Use, duplication or
# disclosure restricted by GSA ADP Schedule Contract with
# IBM Corp.
Expand Down Expand Up @@ -175,14 +175,14 @@ namespace distributed
std::string const & baseUrl, std::string const & apiEndpoint, std::string const & queryParams,
std::string const & jsonRequest, std::string & jsonResponse, PersistenceError & dbError);
bool runDataStoreCommand(std::vector<std::string> const & cmdList, std::string & resultValue, PersistenceError & dbError);
void getAllKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, PersistenceError & dbError);

// Lock related methods.
uint64_t createOrGetLock(std::string const & name, PersistenceError & lkError);
void releaseLock(uint64_t lock, PersistenceError & lkError);
bool acquireLock(uint64_t lock, double leaseTime, double maxWaitTimeToAcquireLock, PersistenceError & lkError);
bool removeLock(uint64_t lock, PersistenceError & lkError);
uint32_t getPidForLock(std::string const & name, PersistenceError & lkError);

};
} } } } }
#endif /* REDIS_DB_LAYER_H_ */
15 changes: 14 additions & 1 deletion com.ibm.streamsx.dps/impl/src/CassandraDBLayer.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
# Licensed Materials - Property of IBM
# Copyright IBM Corp. 2011, 2014
# Copyright IBM Corp. 2011, 2022
# US Government Users Restricted Rights - Use, duplication or
# disclosure restricted by GSA ADP Schedule Contract with
# IBM Corp.
Expand Down Expand Up @@ -2420,6 +2420,19 @@ namespace distributed
execute_query(session, statementString.c_str(), errorMsg);
}

// Senthil added this on Apr/06/2022.
// This method will get all the keys from the given store and
// populate them in the caller provided list (vector).
// Be aware of the time it can take to fetch all the keys in a store
// that has several tens of thousands of keys. In such cases, the caller
// has to maintain calm until we return back here.
void CassandraDBLayer::getAllKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, PersistenceError & dbError) {
SPLAPPTRC(L_DEBUG, "Inside getAllKeys for store id " << store, "CassandraDBLayer");

// Not implemented at this time. Simply return.
return;
} // End of getAllKeys method.

CassandraDBLayerIterator::CassandraDBLayerIterator() {

}
Expand Down
16 changes: 15 additions & 1 deletion com.ibm.streamsx.dps/impl/src/CloudantDBLayer.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
# Licensed Materials - Property of IBM
# Copyright IBM Corp. 2011, 2014
# Copyright IBM Corp. 2011, 2022
# US Government Users Restricted Rights - Use, duplication or
# disclosure restricted by GSA ADP Schedule Contract with
# IBM Corp.
Expand Down Expand Up @@ -2294,6 +2294,20 @@ namespace distributed
deleteCloudantDocument(docUrl, curlReturnCode, curlErrorString, httpResponseCode, httpReasonString);
}

// Senthil added this on Apr/06/2022.
// This method will get all the keys from the given store and
// populate them in the caller provided list (vector).
// Be aware of the time it can take to fetch all the keys in a store
// that has several tens of thousands of keys. In such cases, the caller
// has to maintain calm until we return back here.
void CloudantDBLayer::getAllKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, PersistenceError & dbError) {
SPLAPPTRC(L_DEBUG, "Inside getAllKeys for store id " << store, "CloudantDBLayer");

// Not implemented at this time. Simply return.
return;
} // End of getAllKeys method.


CloudantDBLayerIterator::CloudantDBLayerIterator() {

}
Expand Down
16 changes: 15 additions & 1 deletion com.ibm.streamsx.dps/impl/src/CouchbaseDBLayer.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
# Licensed Materials - Property of IBM
# Copyright IBM Corp. 2011, 2015
# Copyright IBM Corp. 2011, 2022
# US Government Users Restricted Rights - Use, duplication or
# disclosure restricted by GSA ADP Schedule Contract with
# IBM Corp.
Expand Down Expand Up @@ -3468,6 +3468,20 @@ namespace distributed
return;
}

// Senthil added this on Apr/06/2022.
// This method will get all the keys from the given store and
// populate them in the caller provided list (vector).
// Be aware of the time it can take to fetch all the keys in a store
// that has several tens of thousands of keys. In such cases, the caller
// has to maintain calm until we return back here.
void CouchbaseDBLayer::getAllKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, PersistenceError & dbError) {
SPLAPPTRC(L_DEBUG, "Inside getAllKeys for store id " << store, "CouchbaseDBLayer");

// Not implemented at this time. Simply return.
return;
} // End of getAllKeys method.


CouchbaseDBLayerIterator::CouchbaseDBLayerIterator() {

}
Expand Down
15 changes: 14 additions & 1 deletion com.ibm.streamsx.dps/impl/src/HBaseDBLayer.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
# Licensed Materials - Property of IBM
# Copyright IBM Corp. 2011, 2014
# Copyright IBM Corp. 2011, 2022
# US Government Users Restricted Rights - Use, duplication or
# disclosure restricted by GSA ADP Schedule Contract with
# IBM Corp.
Expand Down Expand Up @@ -2721,6 +2721,19 @@ namespace distributed
deleteHBase_Column_CF_Row(url, curlReturnCode, curlErrorString, httpResponseCode, httpReasonString);
}

// Senthil added this on Apr/06/2022.
// This method will get all the keys from the given store and
// populate them in the caller provided list (vector).
// Be aware of the time it can take to fetch all the keys in a store
// that has several tens of thousands of keys. In such cases, the caller
// has to maintain calm until we return back here.
void HBaseDBLayer::getAllKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, PersistenceError & dbError) {
SPLAPPTRC(L_DEBUG, "Inside getAllKeys for store id " << store, "HBaseDBLayer");

// Not implemented at this time. Simply return.
return;
} // End of getAllKeys method.

HBaseDBLayerIterator::HBaseDBLayerIterator() {

}
Expand Down
Loading

0 comments on commit 6053e90

Please sign in to comment.