¿ªÔÆÌåÓý

ctrl + shift + ? for shortcuts
© 2025 Groups.io

Need Wallaroo help? Talk to me


Sean Allen
 

Hi everyone,

I hope this message finds you well. I wanted to let everyone on the mailing list know that I'm now doing Developer Relations at Wallaroo Labs. What does this mean for you? Well, if you need help getting started with Wallaroo or making a Wallaroo project successful, come talk to me.

I'd be thrilled to get emails from folks asking how Wallaroo can help solve their Python data processing problems. I'd love to dig into use cases with you. Whatever it is you need (within reason!), I'm here to help you with. Feel free to reach out either via the mailing list or to my email sean@....

-Sean-


 

Dear Sean,

I'd like to use Wallaroo for real time surveillance of AIS data, i.e. marine vessels.
I have? an IP adress (153.44.253.27:5631) over which the Norwegian Coastal Administratiuon is
transmitting live AIS tracks from AIS sattelites.

Wallaroo so far is exciting and fun, hallmarks of python of course, but I seem to have hit a roadblock.

Trying to get my wallaroo module to take input from the said IP adress i use the following startupe script

#!/usr/bin/env python
import os
instruction=("machida "
? ? ? ? ? ? ?"--application-module working "
? ? ? ? ? ? ?"--in 153.44.253.27:5631 "
? ? ? ? ? ? ?"--out 127.0.0.1:7002 "
? ? ? ? ? ? ?"--metrics 127.0.0.1:5001 "
? ? ? ? ? ? ?"--control 127.0.0.1:6000 "
? ? ? ? ? ? ?"--data 127.0.0.1:6001 "
? ? ? ? ? ? ?"--name worker-name "
? ? ? ? ? ? ?"--external 127.0.0.1:5050 "
? ? ? ? ? ? ?"--cluster-initializer "
? ? ? ? ? ? ?"--ponythreads=1 "
? ? ? ? ? ? ?"--ponynoblock"
? ? ? ? ? ? ?)
os.system(instruction)

When run, this produces the output (never mind names, Im using the GitHUb examples as templates):

----Creating source Split and Count with | Split and Count source | ||----
Finished handling Split and Count node
Local topology initialized.
?
|^|^|^|Finished Initializing Local Topology|^|^|^|
---------------------------------------------------------
LocalTopologyInitializer.initialize called a second time. Ignoring since this is a single worker cluster.
initializer external: listening on 127.0.0.1:5050
metrics outgoing connected
Recovery file exists for control channel
initializer control: listening on 127.0.0.1:6000
|~~ INIT PHASE I: Application is created! ~~|
|~~ INIT PHASE II: Application is initialized! ~~|
TCPSink initializing connection to 127.0.0.1:7002
TCPSink connected
|~~ INIT PHASE III: Application is ready to work! ~~|
Need ClusterInitializer to inform that topology is ready
Split and Count source attempting to listen on 153.44.253.27:5631
Split and Count source is unable to listen
This should never happen: failure in /home/local/NTU/aud/wallaroo-tutorial/wallaroo-0.6.1/lib/wallaroo/core/source/tcp_source/tcp_source_listener.pony at line 304
?
All help is gratefully received.?

Best,

Audun Stolpe

Senior Scientist,
The Norwegian Defence Research Establishment


Sean Allen
 

Hi Audun,

Nice to hear from you.?

The error being reported is that 153.44.253.27 port 5631 is unavailable. Either something else is already listening on it, or its not an ip?address on the machine you are on.
The input address should be the IP address that Wallaroo will receive data on.

When you say:

"I have? an IP adress () over which the Norwegian Coastal Administratiuon is
transmitting live AIS tracks from AIS sattelites. "

What does "transmitting live" mean?

In particular, are you saying that something is already listening on that address and receiving data?
Are you saying that ip address is on another machine?

Also, what OS are you using? If Linux, please include distribution and version of said distribution.

-Sean-



On Thu, Jan 17, 2019 at 9:01 AM <audun.stolpe@...> wrote:
Dear Sean,

I'd like to use Wallaroo for real time surveillance of AIS data, i.e. marine vessels.
I have? an IP adress () over which the Norwegian Coastal Administratiuon is
transmitting live AIS tracks from AIS sattelites.

Wallaroo so far is exciting and fun, hallmarks of python of course, but I seem to have hit a roadblock.

Trying to get my wallaroo module to take input from the said IP adress i use the following startupe script

#!/usr/bin/env python
import os
instruction=("machida "
? ? ? ? ? ? ?"--application-module working "
? ? ? ? ? ? ?"--in "
? ? ? ? ? ? ?"--out "
? ? ? ? ? ? ?"--metrics "
? ? ? ? ? ? ?"--control "
? ? ? ? ? ? ?"--data "
? ? ? ? ? ? ?"--name worker-name "
? ? ? ? ? ? ?"--external "
? ? ? ? ? ? ?"--cluster-initializer "
? ? ? ? ? ? ?"--ponythreads=1 "
? ? ? ? ? ? ?"--ponynoblock"
? ? ? ? ? ? ?)
os.system(instruction)

When run, this produces the output (never mind names, Im using the GitHUb examples as templates):

----Creating source Split and Count with | Split and Count source | ||----
Finished handling Split and Count node
Local topology initialized.
?
|^|^|^|Finished Initializing Local Topology|^|^|^|
---------------------------------------------------------
LocalTopologyInitializer.initialize called a second time. Ignoring since this is a single worker cluster.
initializer external: listening on
metrics outgoing connected
Recovery file exists for control channel
initializer control: listening on
|~~ INIT PHASE I: Application is created! ~~|
|~~ INIT PHASE II: Application is initialized! ~~|
TCPSink initializing connection to
TCPSink connected
|~~ INIT PHASE III: Application is ready to work! ~~|
Need ClusterInitializer to inform that topology is ready
Split and Count source attempting to listen on
Split and Count source is unable to listen
This should never happen: failure in /home/local/NTU/aud/wallaroo-tutorial/wallaroo-0.6.1/lib/wallaroo/core/source/tcp_source/tcp_source_listener.pony at line 304
?
All help is gratefully received.?

Best,

Audun Stolpe

Senior Scientist,
The Norwegian Defence Research Establishment



--
Sean T. Allen
VP of Engineering
WallarooLabs.com

Unlock Data's Potential.
Wallaroo Labs makes it simple to scale data applications efficiently, reliably, and on-demand - without worrying about infrastructure. Get to production fast, innovate rapidly, and operate at a low cost.

Please star us on GitHub.


 

Hello Audun,

I understand that the address you provided is a data stream provided by the Norwegian authorities and it's not owned by you.?
In a logical sense, it's the source of your data, but it's not a Wallaroo Source. A Wallaroo TCP Source is essentially a listening socket that's intelligently integrated with the rest of the system, and it lives as part of your running Wallaroo app. So it needs to be set up on an IP that you're running on (either 127.0.0.1 for local testing or your public(WAN/LAN) IP for networked operation.

In this case, you'll want to have a proxy process that listens to the official data source IP, and then forwards all packets to your Wallaroo Source. Something like:

1. Start your Machida application, but change the invocation to listen on a local port:
instruction=("machida "
? ? ? ? ? ? ?"--application-module working "
? ? ? ? ? ? ?"--in 127.0.0.18000?"

2. Start a proxy netcat process that will connect to the source and forward to our Wallaroo Source Listener
$??nc 153.44.253.27 5631 | nc 127.0.0.1 8000

You should see data coming in to your decoder function -- but it may not be framed correctly. That's a concern for our next steps, but first try the above and see if you're receving any data into your Wallaroo app.

Good luck and please let us know if you get stuck!
Simon Zelazny





Wallaroo so far is exciting and fun, hallmarks of python of course, but I seem to have hit a roadblock.

Trying to get my wallaroo module to take input from the said IP adress i use the following startupe script

#!/usr/bin/env python
import os
instruction=("machida "
? ? ? ? ? ? ?"--application-module working "
? ? ? ? ? ? ?"--in "
? ? ? ? ? ? ?"--out "
? ? ? ? ? ? ?"--metrics "
? ? ? ? ? ? ?"--control "
? ? ? ? ? ? ?"--data "
? ? ? ? ? ? ?"--name worker-name "
? ? ? ? ? ? ?"--external "
? ? ? ? ? ? ?"--cluster-initializer "
? ? ? ? ? ? ?"--ponythreads=1 "
? ? ? ? ? ? ?"--ponynoblock"
? ? ? ? ? ? ?)
os.system(instruction)

When run, this produces the output (never mind names, Im using the GitHUb examples as templates):

----Creating source Split and Count with | Split and Count source | ||----
Finished handling Split and Count node
Local topology initialized.
?
|^|^|^|Finished Initializing Local Topology|^|^|^|
---------------------------------------------------------
LocalTopologyInitializer.initialize called a second time. Ignoring since this is a single worker cluster.
initializer external: listening on
metrics outgoing connected
Recovery file exists for control channel
initializer control: listening on
|~~ INIT PHASE I: Application is created! ~~|
|~~ INIT PHASE II: Application is initialized! ~~|
TCPSink initializing connection to
TCPSink connected
|~~ INIT PHASE III: Application is ready to work! ~~|
Need ClusterInitializer to inform that topology is ready
Split and Count source attempting to listen on
Split and Count source is unable to listen
This should never happen: failure in /home/local/NTU/aud/wallaroo-tutorial/wallaroo-0.6.1/lib/wallaroo/core/source/tcp_source/tcp_source_listener.pony at line 304
?
All help is gratefully received.?

Best,

Audun Stolpe

Senior Scientist,
The Norwegian Defence Research Establishment


 
Edited

Thanks for responding so quickly.

The IP adress is indeed on another machine.?
The general problem is how to pipe a live stream from an external IP-adress into wallaroo.
The particular stream in question collects AIS tracks reporting the position, velocity etc. of
sea-going vessels pulled from receiving AIS sattelites.??

My wallaroo module produces output when I feed it AIS from a file.?
Yet, when? *instead* of starting a separate sender I try the suggested pipe, the result is:
?
nc -v 153.44.253.27 5631 | nc -v 127.0.0.1 7010
Connection to 127.0.0.1 7010 port [tcp/*] succeeded!
Connection to 153.44.253.27 5631 port [tcp/*] succeeded!
?
But, alas, no output.

Oh, and I'm using Ubuntu 18.04


Audun?


 

In fact, if I just do

nc 127.0.0.1 7010 (corresponding to the adress specified as input source)

then my wallaroo module acknowledges the connection, but no data I send over it gets through.

A


Sean Allen
 

Sorry, can you go into more detail what you did with `nc` including what other commands you ran and what happened?


On Fri, Jan 18, 2019 at 8:43 AM <audun.stolpe@...> wrote:
In fact, if I just do

nc 127.0.0.1 7010 (corresponding to the adress specified as input source)

then my wallaroo module acknowledges the connection, but no data I send over it gets through.

A



--
Sean T. Allen
VP of Engineering
WallarooLabs.com

Unlock Data's Potential.
Wallaroo Labs makes it simple to scale data applications efficiently, reliably, and on-demand - without worrying about infrastructure. Get to production fast, innovate rapidly, and operate at a low cost.

Please star us on GitHub.


 
Edited

I started the Wallaroo module:?

#!/usr/bin/env python
import os
? ?
instruction=("machida "
? ? ? ? ? ? ?"--application-module tester "
? ? ? ? ? ? ?"--in 127.0.0.1:7010 "?
? ? ? ? ? ? ?"--out 127.0.0.1:7002 "
? ? ? ? ? ? ?"--metrics 127.0.0.1:5001 "
? ? ? ? ? ? ?"--control 127.0.0.1:6000 "
? ? ? ? ? ? ?"--data 127.0.0.1:6001 "
? ? ? ? ? ? ?"--name worker-name "
? ? ? ? ? ? ?"--external 127.0.0.1:5050 "
? ? ? ? ? ? ?"--cluster-initializer "
? ? ? ? ? ? ?"--ponythreads=1 "
? ? ? ? ? ? ?"--ponynoblock"
? ? ? ? ? ? ?)
os.system(instruction)

Using a module that throughputs input to output unchanged. Then I listen to 7002:??

#!/usr/bin/env python
import os
?
instruction = ("data_receiver "
? ? ? ? ? ? ? ?"--ponythreads=1 "
? ? ? ? ? ? ? ?"--ponynoblock "?
? ? ? ? ? ? ? ?"--listen 127.0.0.1:7002"
? ? ? ? ? ? ? ?)
?
os.system(instruction)

At this point, wallaroo reports:

>> |~~ INIT PHASE I: Application is created! ~~|
>> |~~ INIT PHASE II: Application is initialized! ~~|
>> TCPSink initializing connection to 127.0.0.1:7002
>> TCPSink connected
>> |~~ INIT PHASE III: Application is ready to work! ~~|
>> Inputter source attempting to listen on 127.0.0.1:7010
>> Inputter source is listening
>> initializer reported topology ready!
>> All 1 workers reporting Topology ready!
>> Application has successfully initialized.
?
This means that wallaroo is listening on 7010 and that 7010 is correctly configured as a TCP input source, right?
I can now connect with netcat?

>> nc 127.0.0.1 7010

... and wallaroo confirms connection:

>> Inputter source: accepted a connection

Yet, nothing I send over that connection is throughput by the wallaroo module to 7002. For instance, if I redirect input from a file like this:

>> nc 127.0.0.1 8000 < text.txt

nothing happens. Any idea why that is? After all wallaroo confirms the connection as an input source.

A

?






Sean Allen
 



On Fri, Jan 18, 2019 at 9:39 AM <audun.stolpe@...> wrote:
I started the Wallaroo module:?

#!/usr/bin/env python
import os
? ?
instruction=("machida "
? ? ? ? ? ? ?"--application-module tester "
? ? ? ? ? ? ?"--in "?
? ? ? ? ? ? ?"--out "
? ? ? ? ? ? ?"--metrics "
? ? ? ? ? ? ?"--control "
? ? ? ? ? ? ?"--data "
? ? ? ? ? ? ?"--name worker-name "
? ? ? ? ? ? ?"--external "
? ? ? ? ? ? ?"--cluster-initializer "
? ? ? ? ? ? ?"--ponythreads=1 "
? ? ? ? ? ? ?"--ponynoblock"
? ? ? ? ? ? ?)
os.system(instruction)

Using the a module that throughputs input to putput uncanged. The I listen to 7002 with?

#!/usr/bin/env python
import os
?
instruction = ("data_receiver "
? ? ? ? ? ? ? ?"--ponythreads=1 "
? ? ? ? ? ? ? ?"--ponynoblock "?
? ? ? ? ? ? ? ?"--listen "
? ? ? ? ? ? ? ?)
?
os.system(instruction)

At this point, wallaroo reports the following:

>> |~~ INIT PHASE I: Application is created! ~~|
>> |~~ INIT PHASE II: Application is initialized! ~~|
>> TCPSink initializing connection to
>> TCPSink connected
>> |~~ INIT PHASE III: Application is ready to work! ~~|
>> Inputter source attempting to listen on
>> Inputter source is listening
>> initializer reported topology ready!
>> All 1 workers reporting Topology ready!
>> Application has successfully initialized.
?
Now, this means wallaroo is listening on 7010 and that 7010 is correctly configured as a TCP input source, right?
I can now connect with netcat?

>> nc 127.0.0.1 7010

... and wallaroo confirms connection:

>> Inputter source: accepted a connection

Yet, nothing I send over the connection is throughput by the wallaroo module to 7002. For instance I can redirect input from a file like this:

>> nc 127.0.0.1 8000 < text.txt


Is that the nc command you are using to send something into Wallaroo? If yes, that wouldnt work as its running on port 8000.

If you are sending via nc to port 7010, then Wallaroo is receiving the data. The first question would be, are you sending a framed message?
Probably not if using nc.?

The TcpSource accepts framed messages where there is:

4 byte header that indicates the length of the payload
Payload

If you are interested in sending in a file, line by line, I'd suggest checking out the Giles Sender tool that comes with Wallaroo:



It can read a file and send it line by line adding the appropriate framing.
?


 

Sorry mistyped, the port was 7010 in both instances.
Thanks for the link I'll read up.

In the meantime I managed to hook up the stream by using?
wallaroo.GenSourceConfig and then just popping one object off the stream at a time as a way of generating input.

Any reason why I shouldn't do it that way?



 

In the meantime I managed to hook up the stream by using?
wallaroo.GenSourceConfig and then just popping one object off the stream at a time as a way of generating input.

Can you explain a bit further what you mean by this?? What are you passing into?wallaroo.GenSourceConfig?
wallaroo.GenSourceConfig is meant to accept a class which autogenerates messages for Wallaroo.?

If possible, providing us with a Github Gist of your application might make it easier for us to help you out.

On Fri, Jan 18, 2019 at 9:54 AM <audun.stolpe@...> wrote:
Sorry mistyped, the port was 7010 in both instances.
Thanks for the link I'll read up.

In the meantime I managed to hook up the stream by using?
wallaroo.GenSourceConfig and then just popping one object off the stream at a time as a way of generating input.

Any reason why I shouldn't do it that way?



 

Yes, I know that's the intended use of GenSourceConfig. I just tweaked the mode of generation to be that of
popping elemtns off a TCP stream.
?
I'll provide you the details later if necessary, but in order not to get sidetracked let me remind you of my? original?
general question: how does one? redirect an external TCP stream to wallaroo?

I read up on framed messages and the Giles script. But Giles seems to only work for files.
What I have is info continuously coming across a TCP socket and I want to process it using wallaroo.? Do I need to frame?
each individual element manually?

Audun


 



On Fri, Jan 18, 2019 at 4:29 PM <audun.stolpe@...> wrote:
Yes, I know that's the intended use of GenSourceConfig. I just tweaked the mode of generation to be that of
popping elemtns off a TCP stream.
?
I'll provide you the details later if necessary, but in order not to get sidetracked let me remind you of my? original?
general question: how does one? redirect an external TCP stream to wallaroo?

I read up on framed messages and the Giles script. But Giles seems to only work for files.
What I have is info continuously coming across a TCP socket and I want to process it using wallaroo.? Do I need to frame?
each individual element manually?

Audun


 

Hi Audun

It's clear to me that what you want is a . You'd have a connector process that opens a python socket to the data stream, and exposes that same stream to Wallaroo. I imagine it's not a far cry from your GenSource modification.???is the general pattern you'll want to use, but instead of subscribing to a Redis topic, you'd just open a tcp socket to the broadcast server.

Secondly, regarding framing: the reason for the framing format is so that the decode function can be stateless. Otherwise you'd have to buffer extraneous data and decide when there's 'enough' to emit a business object. With the frame header, the decode function is only called once the amount of bytes defined in the header has been ingested off the TCP socket. You won't have to worry about this if you go ahead and use a Connector.?

If you do want to use framing, an easy trick is to do the following to your send your framed messages to the TCP Source:

import socket
import struct

socket = ... (establish the socket)
line = "This data will be received in one go in the decode function"
socket.sendall(struct.pack(">I",len(line))+line)


Cheers,?
Simon

(P.S. Sorry about the blank email above -- still getting used to the new gmail interface)



On Fri, Jan 18, 2019 at 4:37 PM Simon Zelazny <simon@...> wrote:


On Fri, Jan 18, 2019 at 4:29 PM <audun.stolpe@...> wrote:
Yes, I know that's the intended use of GenSourceConfig. I just tweaked the mode of generation to be that of
popping elemtns off a TCP stream.
?
I'll provide you the details later if necessary, but in order not to get sidetracked let me remind you of my? original?
general question: how does one? redirect an external TCP stream to wallaroo?

I read up on framed messages and the Giles script. But Giles seems to only work for files.
What I have is info continuously coming across a TCP socket and I want to process it using wallaroo.? Do I need to frame?
each individual element manually?

Audun