Add new SAL endpoints and fix .gitignore

Change-Id: I18e6b03031158dd876a619fad5f37abf005fa261
This commit is contained in:
gsavvas 2024-02-20 10:51:14 +02:00
parent 988fedcb7f
commit 3e8dcd86eb
63 changed files with 307 additions and 38 deletions

2
.gitignore vendored
View File

@ -2,3 +2,5 @@ __pycache__/
.nox/
**/.gradle
**/config/appplication.yml
**/build

View File

@ -29,7 +29,8 @@ WORKDIR /app
COPY --from=builder /app/target/exn-middleware-core-0.0.1-SNAPSHOT.jar ./exn-middleware-core-0.0.1-SNAPSHOT.jar
#Copy application configuration
COPY config ./
RUN mkdir config
COPY config/application-production.yml ./config/application.yml
#Run java app on container start
CMD ["java", "-jar", "exn-middleware-core-0.0.1-SNAPSHOT.jar"]

View File

@ -0,0 +1,27 @@
logging:
level:
eu.nebulouscloud.exn: TRACE
spring:
profiles: default
main:
web-application-type: none
jms:
pub-sub-domain: true
application:
exn:
config:
url: 'nebulous-activemq'
port: '5672'
username: 'admin'
password: 'admin'
sal:
protocol: 'http'
host: 'sal'
port: '8080'
api: 'sal'
username: 'admin'
password: 'admin'
jms:
topic: 'eu.nebulouscloud'

View File

@ -12,7 +12,8 @@ spring:
application:
exn:
config:
url: 'nebulous-activemq'
# url: 'nebulous-activemq'
url: 'localhost'
port: '5672'
username: 'admin'
password: 'admin'

View File

@ -29,25 +29,37 @@ class ConfigureEXNConnector {
"exn.sal",
new EXNConnectorHandler(),
[
new Publisher("cloud.post","cloud.post.reply",true,false),
new Publisher("cloud.create","cloud.create.reply",true,false),
new Publisher("cloud.get","cloud.get.reply",true,false),
new Publisher("cloud.delete","cloud.delete.reply",true,false),
new Publisher("nodecandidate.post","nodecandidate.post.reply",true,false),
new Publisher("nodecandidate.rank","nodecandidate.rank.reply",true,false),
new Publisher("nodecandidate.get","nodecandidate.get.reply",true,false),
new Publisher("node.post","node.post.reply",true,false),
new Publisher("node.update","node.update.reply",true,false),
new Publisher("node.create","node.create.reply",true,false),
new Publisher("node.assign","node.assign.reply",true,false),
new Publisher("node.get","node.get.reply",true,false),
new Publisher("node.delete","node.delete.reply",true,false),
new Publisher("job.get","job.get.reply",true,false),
new Publisher("job.update","job.update.reply",true,false),
new Publisher("job.post","job.post.reply",true,false),
new Publisher("job.delete","job.delete.reply",true,false)
new Publisher("job.submit","job.submit.reply",true,false),
new Publisher("job.create","job.create.reply",true,false),
new Publisher("job.delete","job.delete.reply",true,false),
new Publisher("job.kill","job.kill.reply",true,false),
new Publisher("job.stop","job.stop.reply",true,false),
new Publisher("scale.in","scale.in.reply",true,false),
new Publisher("scale.out","scale.out.reply",true,false),
new Publisher("cluster.get","cluster.get.reply",true,false),
new Publisher("cluster.define","cluster.define.reply",true,false),
new Publisher("cluster.deploy","cluster.deploy.reply",true,false),
new Publisher("cluster.deployApplication","cluster.deployApplication.reply",true,false),
new Publisher("cluster.scaleout","cluster.scaleout.reply",true,false),
new Publisher("cluster.scalein","cluster.scalein.reply",true,false)
],
[
new Consumer("cloud","cloud.>", amqpSalMessageHandler,true,false),
new Consumer("nodecandidate","nodecandidate.>", amqpSalMessageHandler,true,false),
new Consumer("node","node.>", amqpSalMessageHandler,true,false),
new Consumer("job","job.>", amqpSalMessageHandler,true,false)
new Consumer("job","job.>", amqpSalMessageHandler,true,false),
new Consumer("scale","scale.>", amqpSalMessageHandler,true,false),
new Consumer("cluster","cluster.>", amqpSalMessageHandler,true,false)
],
false,
false,

View File

@ -1,6 +1,7 @@
package eu.nebulouscloud.exn.modules.sal.processors
import com.fasterxml.jackson.databind.ObjectMapper
import eu.nebulouscloud.exn.modules.sal.service.ActionResolveService
import org.apache.commons.lang3.StringUtils
import org.apache.qpid.protonj2.client.impl.ClientMessage
import org.slf4j.Logger
@ -23,6 +24,9 @@ abstract class AbstractProcessor implements Processor {
@Autowired
ObjectMapper mapper
@Autowired
ActionResolveService resolveService
@Override
Map process(String destination, ClientMessage message) {
@ -37,6 +41,8 @@ abstract class AbstractProcessor implements Processor {
logger.debug("[{}] Processing {}", metaData, o)
String method = destination.substring(destination.lastIndexOf(".") + 1)
method = resolveService.resolve(method,metaData)
try {
switch (method) {
@ -52,8 +58,12 @@ abstract class AbstractProcessor implements Processor {
case 'delete':
ret = delete(metaData, o)
break;
case 'create':
ret = create(metaData,o)
break
default:
ret = post(metaData,o)
ret.status = HttpStatus.NOT_ACCEPTABLE
ret.body = ["key": "gateway-server-exception-error", "message": 'Action '+method+' not supported']
}
} catch (HttpClientErrorException e) {
@ -84,7 +94,7 @@ abstract class AbstractProcessor implements Processor {
}
Map post(Map metaData, String body) { return noop(metaData, body) }
Map create(Map metaData, String body) { return noop(metaData, body) }
Map search(Map metaData, String body) { return noop(metaData, body) }
@ -98,4 +108,18 @@ abstract class AbstractProcessor implements Processor {
return ["status": HttpStatus.ACCEPTED.value(), "body": metaData?.user + " { " + body + "}"]
}
protected def normalizeResponse(def response){
if(response instanceof Boolean){
return ["success":response]
}
if(response instanceof Number){
return ["success":response ==0]
}
return response
}
}

View File

@ -24,7 +24,7 @@ class CloudProcessor extends AbstractProcessor{
SalConfiguration salConfiguration
@Override
Map post(Map metaData, String o){
Map create(Map metaData, String o){
def ret =[
"status": HttpStatus.OK.value(),
@ -48,7 +48,7 @@ class CloudProcessor extends AbstractProcessor{
return [
"status": HttpStatus.OK.value(),
"body": ["success": response == 0]
"body": normalizeResponse(response)
]
}
@ -103,7 +103,7 @@ class CloudProcessor extends AbstractProcessor{
return [
"status": HttpStatus.OK.value(),
"body": ["success":response]
"body": normalizeResponse(response)
]
}

View File

@ -0,0 +1,86 @@
package eu.nebulouscloud.exn.modules.sal.processors.impl
import eu.nebulouscloud.exn.modules.sal.configuration.SalConfiguration
import eu.nebulouscloud.exn.modules.sal.processors.AbstractProcessor
import eu.nebulouscloud.exn.modules.sal.repository.GatewayRepository
import eu.nebulouscloud.exn.modules.sal.repository.cloud.CloudRepository
import eu.nebulouscloud.exn.modules.sal.repository.cluster.ClusterRepository
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.http.HttpHeaders
import org.springframework.http.HttpStatus
import org.springframework.http.MediaType
import org.springframework.stereotype.Service
@Service
class ClusterProcessor extends AbstractProcessor{
@Autowired
ClusterRepository clusterRepository
@Autowired
GatewayRepository gatewayRepository
@Autowired
SalConfiguration salConfiguration
@Override
Map get(Map metaData, String o) {
logger.info('{} - Getting cluster {}',metaData.user, metaData.cluster)
String sessionId = gatewayRepository.login(salConfiguration.username,salConfiguration.password)
HttpHeaders headers = new HttpHeaders()
headers.add('sessionid',sessionId)
def response = clusterRepository.getById(metaData.clusterName as String,headers)
return [
"status": HttpStatus.OK.value(),
"body": normalizeResponse(response)
]
}
@Override
Map create(Map metaData, String o){
logger.info('{} - Posting cluster action {} with body {}',metaData?.user, metaData.action ,o)
String sessionId = gatewayRepository.login(salConfiguration.username,salConfiguration.password)
HttpHeaders headers = new HttpHeaders()
headers.add('sessionid',sessionId)
headers.setContentType(MediaType.APPLICATION_JSON)
def response = clusterRepository.postAction(o,metaData.action as String,headers,Object.class)
logger.info('Got response {}',response)
return [
"status": HttpStatus.OK.value(),
"body": normalizeResponse(response)
]
}
@Override
Map update(Map metaData, String o) {
logger.info('{} - Scaling cluster with body {}',metaData?.user, o)
String sessionId = gatewayRepository.login(salConfiguration.username,salConfiguration.password)
HttpHeaders headers = new HttpHeaders()
headers.add('sessionid',sessionId)
def response = clusterRepository.scale(o,metaData.action as String,headers, Object.class)
logger.info('Got response {}',response)
return [
"status": HttpStatus.OK.value(),
"body": normalizeResponse(response)
]
}
}

View File

@ -28,7 +28,7 @@ class JobProcessor extends AbstractProcessor{
Map<String, IJobDeleteStrategy> deleteStrategies
@Override
Map post(Map metaData, String o){
Map create(Map metaData, String o){
def ret =[
"status": HttpStatus.OK.value(),
@ -52,7 +52,7 @@ class JobProcessor extends AbstractProcessor{
return [
"status": HttpStatus.OK.value(),
"body": ["success":response]
"body": normalizeResponse(response)
]
}
@ -60,12 +60,7 @@ class JobProcessor extends AbstractProcessor{
@Override
Map get(Map metaData, String o) {
def ret =[
"status": HttpStatus.OK.value(),
"body": {}
]
logger.info('{} - Getting clouds {}',metaData?.user, o)
logger.info('{} - Getting jobs {}',metaData?.user, o)
//User Credentials for connecting to ProActive Server.
//SAL is a REST interface to PWS. Get it from UI or store behind the scenes ?
@ -85,7 +80,7 @@ class JobProcessor extends AbstractProcessor{
return [
"status": HttpStatus.OK.value(),
"body": response
"body": normalizeResponse(response)
]
}
@ -93,11 +88,6 @@ class JobProcessor extends AbstractProcessor{
@Override
Map delete(Map metaData, String o) {
def ret =[
"status": HttpStatus.OK.value(),
"body": {}
]
String jobId = metaData.jobId
String action = metaData.action
@ -155,7 +145,7 @@ class JobProcessor extends AbstractProcessor{
return [
"status": HttpStatus.OK.value(),
"body": response
"body": normalizeResponse(response)
]
}

View File

@ -40,7 +40,6 @@ class NodeCandidateProcessor extends AbstractProcessor{
headers.add('sessionid',sessionId)
headers.setContentType(MediaType.APPLICATION_JSON)
//Check jobId mentioned above
List response = nodeCandidateRepository.findCandidates(o,headers,List.class)
return [
@ -51,7 +50,7 @@ class NodeCandidateProcessor extends AbstractProcessor{
}
@Override
Map post(Map metaData, String o) {
Map update(Map metaData, String o) {
logger.info('{} - Ranking node candidates {}',metaData?.user, o)
@ -61,12 +60,11 @@ class NodeCandidateProcessor extends AbstractProcessor{
headers.add('sessionid',sessionId)
headers.setContentType(MediaType.APPLICATION_JSON)
//Check jobId mentioned above
List response = nodeCandidateRepository.rankCandidates(o,headers,Object.class)
def response = nodeCandidateRepository.rankCandidates(o,headers,Object.class)
return [
"status": HttpStatus.OK.value(),
"body": response
"body": normalizeResponse(response)
]
}

View File

@ -23,7 +23,7 @@ class NodeProcessor extends AbstractProcessor{
Map<String,NodeRegistrar> nodeRegistrarMap
@Override
Map post(Map metaData, String o){
Map create(Map metaData, String o){
def ret =[
"status": HttpStatus.OK.value(),
@ -53,7 +53,7 @@ class NodeProcessor extends AbstractProcessor{
return [
"status": HttpStatus.OK.value(),
"body": ["success": response == 0]
"body": normalizeResponse(response)
]
}
@ -127,7 +127,7 @@ class NodeProcessor extends AbstractProcessor{
return [
"status": HttpStatus.OK.value(),
"body": response
"body": normalizeResponse(response)
]
}

View File

@ -0,0 +1,57 @@
package eu.nebulouscloud.exn.modules.sal.processors.impl
import eu.nebulouscloud.exn.modules.sal.configuration.SalConfiguration
import eu.nebulouscloud.exn.modules.sal.processors.AbstractProcessor
import eu.nebulouscloud.exn.modules.sal.repository.GatewayRepository
import eu.nebulouscloud.exn.modules.sal.repository.cloud.CloudRepository
import eu.nebulouscloud.exn.modules.sal.repository.scale.ScaleRepository
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.http.HttpHeaders
import org.springframework.http.HttpStatus
import org.springframework.http.MediaType
import org.springframework.stereotype.Service
@Service
class ScaleProcessor extends AbstractProcessor{
@Autowired
ScaleRepository scaleRepository
@Autowired
GatewayRepository gatewayRepository
@Autowired
SalConfiguration salConfiguration
@Override
Map update(Map metaData, String o) {
logger.info('{} - Scaling job {} with task {} and action {} with body {}',metaData?.user, metaData.jobId, metaData.taskName, o)
String sessionId = gatewayRepository.login(salConfiguration.username,salConfiguration.password)
HttpHeaders headers = new HttpHeaders()
headers.add('sessionid',sessionId)
headers.setContentType(MediaType.APPLICATION_JSON)
String jobId = metaData.jobId
String taskName = metaData.taskName
if(!jobId || !taskName){
return [
"status": HttpStatus.BAD_REQUEST.value(),
"body": ["key":"not-job-or-task-definition","message":"JobId and TaskName cannot be null"]
]
}
//Check jobId mentioned above
Object response = scaleRepository.scale(o,jobId,taskName,metaData.action as String,headers, Object.class)
return [
"status": HttpStatus.OK.value(),
"body": normalizeResponse(response)
]
}
}

View File

@ -0,0 +1,26 @@
package eu.nebulouscloud.exn.modules.sal.repository.cluster
import eu.nebulouscloud.exn.modules.sal.repository.AbstractSalRepository
import org.springframework.http.HttpHeaders
import org.springframework.stereotype.Repository
@Repository
class ClusterRepository extends AbstractSalRepository{
ClusterRepository() {
super('cluster')
}
private final Map<String,String> SCALE_ACTION_MAPPING = [
'scaleout' : 'out',
'scalein' : 'in'
]
def postAction(String body, String action, HttpHeaders headers, Class responseType){
post(action,body,headers,responseType)
}
def scale(String body, String action, HttpHeaders headers, Class responseType){
post("scale/${SCALE_ACTION_MAPPING[action]}",body,headers,responseType)
}
}

View File

@ -0,0 +1,17 @@
package eu.nebulouscloud.exn.modules.sal.repository.scale
import eu.nebulouscloud.exn.modules.sal.repository.AbstractSalRepository
import org.springframework.http.HttpHeaders
import org.springframework.stereotype.Repository
@Repository
class ScaleRepository extends AbstractSalRepository{
ScaleRepository() {
super("scale")
}
def scale(String body, String jobId, String taskName, String action, HttpHeaders headers, Class responseType){
post("${jobId}/${taskName}/${action}",body,headers,responseType)
}
}

View File

@ -0,0 +1,28 @@
package eu.nebulouscloud.exn.modules.sal.service
import org.springframework.stereotype.Service
@Service
class ActionResolveService {
private final Map<List<String>,String> ACTION_MAPPING =[
['assign','submit','in','out','scaleout','scalein']:'update',
['define','deploy','deployApplication']:'create',
['delete','stop','kill'] : 'delete',
['rank'] : 'update'
]
String resolve(String method, Map metaData){
String match = ACTION_MAPPING.findResult {k,v -> k.contains(method) ? v : null }
//If matched then used method is treated as a metadata for the action
//stop: delete -> stop
//create: create -> create
if(match){
metaData.put('action',method)
}
return match ?: method
}
}