Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update api query improvement #48

Open
wants to merge 8 commits into
base: lt_test_load_gowtham
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,14 @@ public void updateTask(TaskModel task) {
}
}

@Override
public void updateTasksInBatch(List<TaskModel> tasks) {
TaskModel task = tasks.get(0);
LOGGER.debug(
"Workflow Task removed from TASKS_IN_PROGRESS_STATUS with tasksInProgressKey: {}, workflowId: {}, taskId: {}, taskType: {}, taskStatus: {} during updateTask",
task.getTaskDefName(), task.getWorkflowInstanceId(), task.getTaskId(), task.getTaskType(), task.getStatus().name());
}

/**
* @method to verify the task status and update the task_in_progress table
* also removes if its a terminal task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ public interface ExecutionDAO {
*/
void updateTask(TaskModel task);


/**
* @param tasks Task to be updated
*/
void updateTasksInBatch(List<TaskModel> tasks);

/**
* Checks if the number of tasks in progress for the given taskDef will exceed the limit if the
* task is scheduled to be in progress (given to the worker or for system tasks start() method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,18 @@ public void updateTask(TaskModel task) {
}
}

@Override
public void updateTasksInBatch(List<TaskModel> tasks) {
TaskModel task = tasks.get(0);
LOGGER.debug(
"Workflow Task removed from TASKS_IN_PROGRESS_STATUS with tasksInProgressKey: {}, workflowId: {}, taskId: {}, taskType: {}, taskStatus: {} during updateTask",
nsKey(IN_PROGRESS_TASKS, task.getTaskDefName()),
task.getWorkflowInstanceId(),
task.getTaskId(),
task.getTaskType(),
task.getStatus().name());
}

@Override
public boolean exceedsLimit(TaskModel task) {
Optional<TaskDef> taskDefinition = task.getTaskDefinition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ public void addTaskInProgress(TaskModel task) {
session.execute(
selectTaskInProgressStatement.bind(task.getTaskDefName(),
UUID.fromString(task.getTaskId())));
if (resultSet.all().isEmpty() || resultSet.all().size()<1) {
if (resultSet.isExhausted()) {
session.execute(
insertTaskInProgressStatement.bind(task.getTaskDefName(),
UUID.fromString(task.getTaskId()),
Expand All @@ -325,7 +325,6 @@ public void addTaskInProgress(TaskModel task) {
LOGGER.info("Task with defName {} and Id {} and status {} in addTaskInProgress NOT inserted as already exists "
,task.getTaskDefName(), task.getTaskId(),task.getStatus());
}

}

/**
Expand Down Expand Up @@ -382,6 +381,42 @@ public void updateTask(TaskModel task) {
}
}


@Override
public void updateTasksInBatch(List<TaskModel> tasks) {
BatchStatement batch = new BatchStatement();
try {
for (TaskModel task : tasks) {
Integer correlationId = Objects.isNull(task.getCorrelationId()) ? 0 : Integer.parseInt(task.getCorrelationId());
String taskPayload = toJson(task);

recordCassandraDaoRequests("updateTask", task.getTaskType(), task.getWorkflowType());
recordCassandraDaoPayloadSize("updateTask", taskPayload.length(), task.getTaskType(), task.getWorkflowType());
TaskModel prevTask = getTask(task.getTaskId());

LOGGER.debug("Received updateTask for task {} with taskStatus {} in workflow {} with taskRefName {} and prevTaskStatus {} ",
task.getTaskId(), task.getStatus(), task.getWorkflowInstanceId(), task.getReferenceTaskName(), prevTask.getStatus());

if (!prevTask.getStatus().equals(TaskModel.Status.COMPLETED)) {
batch.add(insertTaskStatement.bind(UUID.fromString(task.getWorkflowInstanceId()), correlationId, task.getTaskId(), taskPayload));

LOGGER.debug("Prepared updateTask for task {} with taskStatus {} with taskRefName {} for workflowId {} ", task.getTaskId(),
task.getStatus(), task.getReferenceTaskName(), task.getWorkflowInstanceId());
}
verifyTaskStatus(task);
}

// Execute the batch update
session.execute(batch);
LOGGER.debug("Batch update executed for {} tasks", tasks.size());
} catch (DriverException e) {
Monitors.error(CLASS_NAME, "updateTask");
String errorMsg = "Error updating tasks in batch for workflow.";
LOGGER.error(errorMsg, e);
throw new TransientException(errorMsg, e);
}
}

/**
* @method to verify the task status and update the task_in_progress table
* also removes if its a terminal task
Expand Down