Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion emulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,10 @@ func (s *Server) PurgeQueue(ctx context.Context, in *tasks.PurgeQueueRequest) (*

if s.Options.HardResetOnPurgeQueue {
// Use the development environment behaviour - synchronously purge the queue and release all task names
queue.HardReset(s)
err := queue.HardReset(s)
if err != nil {
return nil, err
}
} else {
// Mirror production behaviour - spin off an asynchronous purge operation and return
queue.Purge()
Expand Down
94 changes: 93 additions & 1 deletion emulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,6 @@ func TestPurgeQueueOptionallyPerformsHardReset(t *testing.T) {
require.NoError(t, err)

// In this mode, purging the queue is synchronous so we should be in the empty state straight away
time.Sleep(1 * time.Second)
assertTaskListIsEmpty(t, client, createdQueue)
assertGetTaskFails(t, grpcCodes.NotFound, client, createdTask.GetName())

Expand All @@ -361,6 +360,92 @@ func TestPurgeQueueOptionallyPerformsHardReset(t *testing.T) {
)
}

func TestPurgeQueueHardResetWaitsForRunningTask(t *testing.T) {
serv, client := setUp(t, ServerOptions{HardResetOnPurgeQueue: true})
defer tearDown(t, serv)

createdQueue := createTestQueue(t, client)
defer tearDownQueue(t, client, createdQueue)

srv, receivedRequests := startTestServer()
defer srv.Shutdown(context.Background())

createTaskRequest := taskspb.CreateTaskRequest{
Parent: createdQueue.GetName(),
Task: &taskspb.Task{
Name: createdQueue.GetName() + "/tasks/any-task",
MessageType: &taskspb.Task_HttpRequest{
HttpRequest: &taskspb.HttpRequest{
Url: "http://localhost:5000/slow_running?latency=2s",
},
},
},
}
createdTask, err := client.CreateTask(context.Background(), &createTaskRequest)
require.NoError(t, err)

// Now purge the queue
purgeQueueRequest := taskspb.PurgeQueueRequest{
Name: createdQueue.GetName(),
}
_, err = client.PurgeQueue(context.Background(), &purgeQueueRequest)
require.NoError(t, err)

// In this mode, purging the queue is synchronous so we should be in the empty state straight away
assertTaskListIsEmpty(t, client, createdQueue)
assertGetTaskFails(t, grpcCodes.NotFound, client, createdTask.GetName())

// PurgeQueue shouldn't have returned till the task completed, so the test HTTP request should be available now
receivedRequest, err := awaitHttpRequestWithTimeout(receivedRequests, 5*time.Millisecond)
require.NotNil(t, receivedRequest, "Request was received")

// And the task should still not exist even after the emulator received the response
assertTaskListIsEmpty(t, client, createdQueue)
assertGetTaskFails(t, grpcCodes.NotFound, client, createdTask.GetName())
}

func TestPurgeQueueHardResetTimesOutWithSlowTask(t *testing.T) {
serv, client := setUp(t, ServerOptions{HardResetOnPurgeQueue: true})
defer tearDown(t, serv)

createdQueue := createTestQueue(t, client)
defer tearDownQueue(t, client, createdQueue)

srv, receivedRequests := startTestServer()
defer srv.Shutdown(context.Background())

createTaskRequest := taskspb.CreateTaskRequest{
Parent: createdQueue.GetName(),
Task: &taskspb.Task{
Name: createdQueue.GetName() + "/tasks/any-task",
MessageType: &taskspb.Task_HttpRequest{
HttpRequest: &taskspb.HttpRequest{
Url: "http://localhost:5000/slow_running?latency=4s",
},
},
},
}
createdTask, err := client.CreateTask(context.Background(), &createTaskRequest)
require.NoError(t, err)

// Now purge the queue - first time will time out after 3 seconds because the task request takes 4 seconds
purgeQueueRequest := taskspb.PurgeQueueRequest{
Name: createdQueue.GetName(),
}
_, err = client.PurgeQueue(context.Background(), &purgeQueueRequest)
assertIsGrpcError(t, "^Timed out waiting for tasks to be purged", grpcCodes.DeadlineExceeded, err)

// We can retry - this will succeed because the task completes within our timeout
_, err = client.PurgeQueue(context.Background(), &purgeQueueRequest)
require.NoError(t, err, "Purge should succeed after request")

// And we should now be in the empty state, with the request already executed
assertTaskListIsEmpty(t, client, createdQueue)
assertGetTaskFails(t, grpcCodes.NotFound, client, createdTask.GetName())
receivedRequest, err := awaitHttpRequestWithTimeout(receivedRequests, 5*time.Millisecond)
require.NotNil(t, receivedRequest, "Request was received")
}

func TestSuccessTaskExecution(t *testing.T) {
serv, client := setUp(t, ServerOptions{})
defer tearDown(t, serv)
Expand Down Expand Up @@ -713,6 +798,13 @@ func startTestServer() (*http.Server, <-chan *http.Request) {
requestChannel <- r
})

mux.HandleFunc("/slow_running", func(w http.ResponseWriter, r *http.Request) {
latency, _ := time.ParseDuration(r.URL.Query().Get("latency"))
time.Sleep(latency)
w.WriteHeader(200)
requestChannel <- r
})

srv := &http.Server{Addr: "localhost:5000", Handler: mux}

go srv.ListenAndServe()
Expand Down
57 changes: 45 additions & 12 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
pduration "github.com/golang/protobuf/ptypes/duration"

tasks "google.golang.org/genproto/googleapis/cloud/tasks/v2"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)

// Queue holds all internals for a task queue
Expand Down Expand Up @@ -236,7 +238,7 @@ func (queue *Queue) Purge() *sync.WaitGroup {
}

// Goes beyond `Purge` behaviour to synchronously delete all tasks and their name handles
func (queue *Queue) HardReset(s *Server) {
func (queue *Queue) HardReset(s *Server) error {
waitGroup := queue.Purge()
waitGroup.Wait()

Expand All @@ -245,22 +247,53 @@ func (queue *Queue) HardReset(s *Server) {
// - task.Delete() writes to a buffered `cancel` channel
// - task.Schedule() reads from that buffered channel in a separate goroutine
// - When that goroutine sees the task is cancelled, it sets the task value to nil in the tasks map
// - Additionally, if a task has already been dispatched then task.Delete() has no effect until after the current
// execution, which depends entirely on the response time of the task's target.
//
// We need to be certain that we only remove the task from map *after* that completes, otherwise the task name will
// be reinserted with the nil value. At the moment the only easy way I can think of is to sleep for a very short
// period to allow the tasks' internal goroutines to fire first.
time.Sleep(10 * time.Millisecond)
// be reinserted with the nil value.
isReadyChannel := make(chan bool, 1)
tryDeleteTasks := func() {
queue.tsMux.Lock()
defer queue.tsMux.Unlock()

queue.tsMux.Lock()
defer queue.tsMux.Unlock()
for taskName, task := range queue.ts {
if task != nil {
// The naive "sleep till it deletes" approach described above is too naive...
panic("Expected task to be deleted by now!")
hasAnyPending := false
for taskName, task := range queue.ts {
if task == nil {
// Task has already been deleted / ran to completion - safe to remove
delete(queue.ts, taskName)
s.hardDeleteTask(taskName)
} else {
// Task is still running (or the `cancel` channel has not fired) - will need to wait and retry
hasAnyPending = true
}
}
isReadyChannel <- !hasAnyPending
}

delete(queue.ts, taskName)
s.hardDeleteTask(taskName)
// The timeout applies across all iterations of the for loop.
// It is intentionally relatively short, because the internal retry interval is rapid.
// If calling code expects some task requests to last longer, it should handle the DEADLINE_EXCEEDED error and retry
// on a schedule to suit the application.
timeout := time.After(3 * time.Second)

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be made configurable to avoid the complexity of having to handle this - especially since this is non-standard / undocumented behaviour?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only thing is even if you want to wait longer, you probably don't want the emulator checking every 5ms for longer.

But it needs to be that fast at first, because with no tasks running I was finding updating the cancel channel etc could take 0-10ms so a longer retry interval would cause unnecessary delays.

I guess the better solution would be a basic exponential backoff. Could probably implement that easily enough with the simpler sync loop you suggested.

Then it would be fine to make this configurable - presumably as an extra CLI option?

Although it does then mean more parameters to validate, is it valid to set this option without enabling hard-reset mode etc. So makes the emulator/emulator interface a bit more complex...

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree; tempted to say just keep it as you have it for now as it's non-core behaviour anyway. If there is a need I expect people will raise an issue and we can look at it then.


go tryDeleteTasks()

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there a need for a go routines here? Since there is no asynchronous operation, and the intent is for the caller to wait anyway, I believe a simple loop with a time.Since(time.Now()) >= 3*time.Second can suffice.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably just my inexperience with go threading/async. I suspect you're right, I'll give it a go with a simpler loop.


for {
select {
case isReady := <-isReadyChannel:
if isReady {
// All tasks have been purged
return nil
} else {
// One or more tasks is not yet deleted, wait and retry.
time.Sleep(5 * time.Millisecond)
go tryDeleteTasks()
}
case <-timeout:
log.Println("HardReset timed out waiting for tasks to clear")
return status.Errorf(codes.DeadlineExceeded, "Timed out waiting for tasks to be purged")

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this should be a custom error, and let the handler worry about the grpc response - what do you reckon?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm, do you mean return a (non-grpc) error from the queue method and then let the emulator func convert that to the DEADLINE_EXCEEDED grpc response? That seems reasonable.

If you mean a custom GRPC response code, I'm not sure how to do that.

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry, yes I meant the former (non-grpc from the queue method).

}
}
}

Expand Down
11 changes: 8 additions & 3 deletions readme.MD
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,14 @@ emulator - e.g. between each scenario in a test run.

The optional `hard-reset-on-purge-queue` flag configures the emulator so that calling `PurgeQueue`
will remove all record of past tasks. It also switches `PurgeQueue` to be a synchronous operation
which only returns once all tasks have been cancelled and the queue is empty. Queued tasks may, of
course, still fire during the PurgeQueue operation - but they cannot fire after PurgeQueue has
returned.
which only returns when:

* any tasks that are already running have completed,
* any tasks that are not yet running have been cancelled,
* and the queue is empty.

In this mode, `PurgeQueue` may fail with a `DEADLINE_EXCEEDED` error if a running task has not
completed within a few seconds. Your application code should handle this and retry as appropriate.

```
go run ./ --hard-reset-on-purge-queue
Expand Down